Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a5165ab5
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a5165ab5
编写于
12月 26, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
1edf1e17
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
85 addition
and
184 deletion
+85
-184
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+0
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+41
-102
source/libs/executor/src/dataDeleter.c
source/libs/executor/src/dataDeleter.c
+2
-3
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+6
-7
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+0
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+3
-35
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+29
-29
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+0
-4
source/libs/executor/src/tsimplehash.c
source/libs/executor/src/tsimplehash.c
+3
-2
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+1
-0
未找到文件。
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
a5165ab5
...
...
@@ -567,7 +567,6 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
pMTree
->
pLoadInfo
=
pBlockLoadInfo
;
pMTree
->
destroyLoadInfo
=
destroyLoadInfo
;
ASSERT
(
pMTree
->
pLoadInfo
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
pFReader
->
pSet
->
nSttF
;
++
i
)
{
// open all last file
struct
SLDataIter
*
pIter
=
NULL
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
a5165ab5
...
...
@@ -243,7 +243,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
return
TSDB_CODE_SUCCESS
;
}
static
void
updateBlockSMAInfo
(
STSchema
*
pSchema
,
SBlockLoadSuppInfo
*
pSupInfo
)
{
static
int32_t
updateBlockSMAInfo
(
STSchema
*
pSchema
,
SBlockLoadSuppInfo
*
pSupInfo
)
{
int32_t
i
=
0
,
j
=
0
;
while
(
i
<
pSchema
->
numOfCols
&&
j
<
pSupInfo
->
numOfCols
)
{
...
...
@@ -251,7 +251,7 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo)
if
(
pTCol
->
colId
==
pSupInfo
->
colId
[
j
])
{
if
(
!
IS_BSMA_ON
(
pTCol
))
{
pSupInfo
->
smaValid
=
false
;
return
;
return
TSDB_CODE_SUCCESS
;
}
i
+=
1
;
...
...
@@ -260,9 +260,11 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo)
// do nothing
i
+=
1
;
}
else
{
ASSERT
(
0
)
;
return
TSDB_CODE_INVALID_PARA
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
initBlockScanInfoBuf
(
SBlockInfoBuf
*
pBuf
,
int32_t
numOfTables
)
{
...
...
@@ -579,7 +581,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
}
if
(
VND_IS_TSMA
(
pVnode
))
{
tsdbDebug
(
"vgId:%d, tsma is selected to query
"
,
TD_VID
(
pVnode
)
);
tsdbDebug
(
"vgId:%d, tsma is selected to query
, %s"
,
TD_VID
(
pVnode
),
idstr
);
}
initReaderStatus
(
&
pReader
->
status
);
...
...
@@ -594,7 +596,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader
->
type
=
pCond
->
type
;
pReader
->
window
=
updateQueryTimeWindow
(
pReader
->
pTsdb
,
&
pCond
->
twindows
);
pReader
->
blockInfoBuf
.
numPerBucket
=
1000
;
// 1000 tables per bucket
ASSERT
(
pCond
->
numOfCols
>
0
);
if
(
pReader
->
pResBlock
==
NULL
)
{
pReader
->
freeBlock
=
true
;
...
...
@@ -605,6 +606,12 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
}
}
if
(
pCond
->
numOfCols
<=
0
)
{
tsdbError
(
"vgId:%d, invalid column number %d in query cond, %s"
,
TD_VID
(
pVnode
),
pCond
->
numOfCols
,
idstr
);
code
=
TSDB_CODE_INVALID_PARA
;
goto
_end
;
}
// todo refactor.
limitOutputBufferSize
(
pCond
,
&
pReader
->
capacity
);
...
...
@@ -794,8 +801,9 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
}
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
)
{
if
(
taosArrayGetSize
(
pBlockIter
->
blockList
)
==
0
)
{
ASSERT
(
pBlockIter
->
numOfBlocks
==
taosArrayGetSize
(
pBlockIter
->
blockList
));
size_t
num
=
taosArrayGetSize
(
pBlockIter
->
blockList
);
if
(
num
==
0
)
{
ASSERT
(
pBlockIter
->
numOfBlocks
==
num
);
return
NULL
;
}
...
...
@@ -805,73 +813,6 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
static
SDataBlk
*
getCurrentBlock
(
SDataBlockIter
*
pBlockIter
)
{
return
&
pBlockIter
->
block
;
}
int32_t
binarySearchForTs
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
)
{
int32_t
midPos
=
-
1
;
int32_t
numOfRows
;
ASSERT
(
order
==
TSDB_ORDER_ASC
||
order
==
TSDB_ORDER_DESC
);
TSKEY
*
keyList
=
(
TSKEY
*
)
pValue
;
int32_t
firstPos
=
0
;
int32_t
lastPos
=
num
-
1
;
if
(
order
==
TSDB_ORDER_DESC
)
{
// find the first position which is smaller than the key
while
(
1
)
{
if
(
key
>=
keyList
[
firstPos
])
return
firstPos
;
if
(
key
==
keyList
[
lastPos
])
return
lastPos
;
if
(
key
<
keyList
[
lastPos
])
{
lastPos
+=
1
;
if
(
lastPos
>=
num
)
{
return
-
1
;
}
else
{
return
lastPos
;
}
}
numOfRows
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfRows
>>
1
)
+
firstPos
;
if
(
key
<
keyList
[
midPos
])
{
firstPos
=
midPos
+
1
;
}
else
if
(
key
>
keyList
[
midPos
])
{
lastPos
=
midPos
-
1
;
}
else
{
break
;
}
}
}
else
{
// find the first position which is bigger than the key
while
(
1
)
{
if
(
key
<=
keyList
[
firstPos
])
return
firstPos
;
if
(
key
==
keyList
[
lastPos
])
return
lastPos
;
if
(
key
>
keyList
[
lastPos
])
{
lastPos
=
lastPos
+
1
;
if
(
lastPos
>=
num
)
return
-
1
;
else
return
lastPos
;
}
numOfRows
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfRows
>>
1u
)
+
firstPos
;
if
(
key
<
keyList
[
midPos
])
{
lastPos
=
midPos
-
1
;
}
else
if
(
key
>
keyList
[
midPos
])
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
return
midPos
;
}
static
int
doBinarySearchKey
(
TSKEY
*
keyList
,
int
num
,
int
pos
,
TSKEY
key
,
int
order
)
{
// start end position
int
s
,
e
;
...
...
@@ -972,8 +913,8 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
int32_t
step
=
asc
?
1
:-
1
;
// make sure it is aligned to 8bit
ASSERT
((((
uint64_t
)
pColData
->
pData
)
&
(
0x8
-
1
))
==
0
);
// make sure it is aligned to 8bit
, the allocated memory address is aligned to 256bit
//
ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// 1. copy data in a batch model
memcpy
(
pColData
->
pData
,
p
,
dumpedRows
*
tDataTypes
[
pData
->
type
].
bytes
);
...
...
@@ -1183,7 +1124,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileDataBlockInfo
*
pBlockInfo
=
getCurrentBlockInfo
(
pBlockIter
);
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
ASSERT
(
pBlockInfo
!=
NULL
);
SDataBlk
*
pBlock
=
getCurrentBlock
(
pBlockIter
);
code
=
tsdbReadDataBlock
(
pReader
->
pFileReader
,
pBlock
,
pBlockData
);
...
...
@@ -1221,8 +1161,6 @@ static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
}
static
int32_t
initBlockOrderSupporter
(
SBlockOrderSupporter
*
pSup
,
int32_t
numOfTables
)
{
ASSERT
(
numOfTables
>=
1
);
pSup
->
numOfBlocksPerTable
=
taosMemoryCalloc
(
1
,
sizeof
(
int32_t
)
*
numOfTables
);
pSup
->
indexPerTable
=
taosMemoryCalloc
(
1
,
sizeof
(
int32_t
)
*
numOfTables
);
pSup
->
pDataBlockInfo
=
taosMemoryCalloc
(
1
,
POINTER_BYTES
*
numOfTables
);
...
...
@@ -1329,7 +1267,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
sup
.
numOfTables
+=
1
;
}
ASSERT
(
numOfBlocks
==
cnt
);
if
(
numOfBlocks
!=
cnt
&&
sup
.
numOfTables
!=
numOfTables
)
{
cleanupBlockOrderSupporter
(
&
sup
);
return
TSDB_CODE_INVALID_PARA
;
}
// since there is only one table qualified, blocks are not sorted
if
(
sup
.
numOfTables
==
1
)
{
...
...
@@ -1351,10 +1292,9 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
tsdbDebug
(
"%p create data blocks info struct completed, %d blocks in %d tables %s"
,
pReader
,
cnt
,
sup
.
numOfTables
,
pReader
->
idStr
);
ASSERT
(
cnt
<=
numOfBlocks
&&
sup
.
numOfTables
<=
numOfTables
);
SMultiwayMergeTreeInfo
*
pTree
=
NULL
;
uint8_t
ret
=
tMergeTreeCreate
(
&
pTree
,
sup
.
numOfTables
,
&
sup
,
fileDataBlockOrderCompar
);
uint8_t
ret
=
tMergeTreeCreate
(
&
pTree
,
sup
.
numOfTables
,
&
sup
,
fileDataBlockOrderCompar
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
cleanupBlockOrderSupporter
(
&
sup
);
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -1432,8 +1372,6 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl
}
static
int32_t
findFileBlockInfoIndex
(
SDataBlockIter
*
pBlockIter
,
SFileDataBlockInfo
*
pFBlockInfo
)
{
ASSERT
(
pBlockIter
!=
NULL
&&
pFBlockInfo
!=
NULL
);
int32_t
step
=
ASCENDING_TRAVERSE
(
pBlockIter
->
order
)
?
1
:
-
1
;
int32_t
index
=
pBlockIter
->
index
;
...
...
@@ -1924,7 +1862,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
}
doMergeRowsInLastBlock
(
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
&
merge
,
&
pReader
->
verRange
);
ASSERT
(
mergeBlockData
);
// merge with block data if ts == key
if
(
tsLastBlock
==
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
])
{
...
...
@@ -1990,7 +1927,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
tRowMergerClear
(
&
merge
);
return
code
;
}
else
{
ASSERT
(
0
);
return
TSDB_CODE_SUCCESS
;
}
}
else
{
// desc order
...
...
@@ -2011,7 +1947,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW
*
pRow
=
getValidMemRow
(
&
pBlockScanInfo
->
iter
,
pDelList
,
pReader
);
TSDBROW
*
piRow
=
getValidMemRow
(
&
pBlockScanInfo
->
iiter
,
pDelList
,
pReader
);
ASSERT
(
pRow
!=
NULL
&&
piRow
!=
NULL
);
int64_t
tsLast
=
INT64_MIN
;
if
(
hasDataInLastBlock
(
pLastBlockReader
))
{
...
...
@@ -2235,7 +2170,6 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
if
(
pReader
->
pReadSnap
->
pMem
!=
NULL
)
{
d
=
tsdbGetTbDataFromMemTable
(
pReader
->
pReadSnap
->
pMem
,
pReader
->
suid
,
pBlockScanInfo
->
uid
);
if
(
d
!=
NULL
)
{
ASSERT
(
pBlockScanInfo
->
iter
.
iter
==
NULL
);
code
=
tsdbTbDataIterCreate
(
d
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pBlockScanInfo
->
iter
.
hasVal
=
(
tsdbTbDataIterGet
(
pBlockScanInfo
->
iter
.
iter
)
!=
NULL
);
...
...
@@ -2349,10 +2283,9 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
static
bool
hasDataInLastBlock
(
SLastBlockReader
*
pLastBlockReader
)
{
return
pLastBlockReader
->
mergeTree
.
pIter
!=
NULL
;
}
bool
hasDataInFileBlock
(
const
SBlockData
*
pBlockData
,
const
SFileBlockDumpInfo
*
pDumpInfo
)
{
if
(
pBlockData
->
nRow
>
0
)
{
ASSERT
(
pBlockData
->
nRow
==
pDumpInfo
->
totalRows
);
if
(
(
pBlockData
->
nRow
>
0
)
&&
(
pBlockData
->
nRow
!=
pDumpInfo
->
totalRows
)
)
{
return
false
;
// this is an invalid result.
}
return
pBlockData
->
nRow
>
0
&&
(
!
pDumpInfo
->
allDumped
);
}
...
...
@@ -2583,7 +2516,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
int32_t
code
=
0
;
SArray
*
pDelData
=
taosArrayInit
(
4
,
sizeof
(
SDelData
));
ASSERT
(
pReader
->
pReadSnap
!=
NULL
);
SDelFile
*
pDelFile
=
pReader
->
pReadSnap
->
fs
.
pDelFile
;
if
(
pDelFile
&&
taosArrayGetSize
(
pReader
->
pDelIdx
)
>
0
)
{
...
...
@@ -2868,7 +2800,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
TSDBKEY
keyInBuf
=
getCurrentKeyInBuf
(
pScanInfo
,
pReader
);
if
(
pBlockInfo
==
NULL
)
{
// build data block from last data file
ASSERT
(
pBlockIter
->
numOfBlocks
==
0
);
code
=
buildComposedDataBlock
(
pReader
);
}
else
if
(
fileBlockShouldLoad
(
pReader
,
pBlockInfo
,
pBlock
,
pScanInfo
,
keyInBuf
,
pLastBlockReader
))
{
code
=
doLoadFileBlockData
(
pReader
,
pBlockIter
,
&
pStatus
->
fileBlockData
,
pScanInfo
->
uid
);
...
...
@@ -3837,7 +3768,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
if
(
pReader
->
pSchema
!=
NULL
)
{
updateBlockSMAInfo
(
pReader
->
pSchema
,
&
pReader
->
suppInfo
);
code
=
updateBlockSMAInfo
(
pReader
->
pSchema
,
&
pReader
->
suppInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_err
;
}
}
STsdbReader
*
p
=
(
pReader
->
innerReader
[
0
]
!=
NULL
)
?
pReader
->
innerReader
[
0
]
:
pReader
;
...
...
@@ -4113,25 +4047,27 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
}
int32_t
tsdbRetrieveDatablockSMA
(
STsdbReader
*
pReader
,
SSDataBlock
*
pDataBlock
,
bool
*
allHave
)
{
SColumnDataAgg
***
pBlockSMA
=
&
pDataBlock
->
pBlockAgg
;
int32_t
code
=
0
;
SColumnDataAgg
***
pBlockSMA
=
&
pDataBlock
->
pBlockAgg
;
*
allHave
=
false
;
*
pBlockSMA
=
NULL
;
if
(
pReader
->
type
==
TIMEWINDOW_RANGE_EXTERNAL
)
{
*
pBlockSMA
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
// there is no statistics data for composed block
if
(
pReader
->
status
.
composedDataBlock
||
(
!
pReader
->
suppInfo
.
smaValid
))
{
*
pBlockSMA
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SBlockLoadSuppInfo
*
pSup
=
&
pReader
->
suppInfo
;
ASSERT
(
pReader
->
pResBlock
->
info
.
id
.
uid
==
pFBlock
->
uid
);
if
(
pReader
->
pResBlock
->
info
.
id
.
uid
!=
pFBlock
->
uid
)
{
return
TSDB_CODE_SUCCESS
;
}
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
if
(
tDataBlkHasSma
(
pBlock
))
{
...
...
@@ -4187,7 +4123,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
}
else
if
(
pAgg
->
colId
<
pSup
->
colId
[
j
])
{
i
+=
1
;
}
else
if
(
pSup
->
colId
[
j
]
<
pAgg
->
colId
)
{
ASSERT
(
pSup
->
colId
[
j
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
);
//
ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID);
pResBlock
->
pBlockAgg
[
pSup
->
slotId
[
j
]]
=
&
pSup
->
tsColAgg
;
j
+=
1
;
}
...
...
@@ -4418,9 +4354,12 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return
terrno
;
}
sversion
=
mr
.
me
.
stbEntry
.
schemaRow
.
version
;
}
else
{
ASSERT
(
mr
.
me
.
type
==
TSDB_NORMAL_TABLE
);
}
else
if
(
mr
.
me
.
type
==
TSDB_NORMAL_TABLE
)
{
sversion
=
mr
.
me
.
ntbEntry
.
schemaRow
.
version
;
}
else
{
terrno
=
TSDB_CODE_INVALID_PARA
;
metaReaderClear
(
&
mr
);
return
terrno
;
}
metaReaderClear
(
&
mr
);
...
...
source/libs/executor/src/dataDeleter.c
浏览文件 @
a5165ab5
...
...
@@ -62,8 +62,8 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pEntry
->
numOfCols
=
taosArrayGetSize
(
pInput
->
pData
->
pDataBlock
);
pEntry
->
dataLen
=
sizeof
(
SDeleterRes
);
ASSERT
(
1
==
pEntry
->
numOfRows
);
ASSERT
(
3
==
pEntry
->
numOfCols
);
//
ASSERT(1 == pEntry->numOfRows);
//
ASSERT(3 == pEntry->numOfCols);
pBuf
->
useSize
=
sizeof
(
SDataCacheEntry
);
...
...
@@ -167,7 +167,6 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataDeleterBuf
*
pBuf
=
NULL
;
taosReadQitem
(
pDeleter
->
pDataBlocks
,
(
void
**
)
&
pBuf
);
ASSERT
(
NULL
!=
pBuf
);
memcpy
(
&
pDeleter
->
nextOutput
,
pBuf
,
sizeof
(
SDataDeleterBuf
));
taosFreeQitem
(
pBuf
);
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
a5165ab5
...
...
@@ -77,8 +77,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pBuf
->
useSize
=
sizeof
(
SDataCacheEntry
);
pEntry
->
dataLen
=
blockEncode
(
pInput
->
pData
,
pEntry
->
data
,
numOfCols
);
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
//
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
//
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
pBuf
->
useSize
+=
pEntry
->
dataLen
;
...
...
@@ -162,15 +162,14 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataDispatchBuf
*
pBuf
=
NULL
;
taosReadQitem
(
pDispatcher
->
pDataBlocks
,
(
void
**
)
&
pBuf
);
ASSERT
(
NULL
!=
pBuf
);
memcpy
(
&
pDispatcher
->
nextOutput
,
pBuf
,
sizeof
(
SDataDispatchBuf
));
taosFreeQitem
(
pBuf
);
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pDispatcher
->
nextOutput
.
pData
;
*
pLen
=
pEntry
->
dataLen
;
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
//
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
//
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
*
pQueryEnd
=
pDispatcher
->
queryEnd
;
qDebug
(
"got data len %"
PRId64
", row num %d in sink"
,
*
pLen
,
...
...
@@ -193,8 +192,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
numOfCols
=
pEntry
->
numOfCols
;
pOutput
->
compressed
=
pEntry
->
compressed
;
ASSERT
(
pEntry
->
numOfRows
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
));
ASSERT
(
pEntry
->
numOfCols
==
*
(
int32_t
*
)(
pEntry
->
data
+
8
+
4
));
//
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
//
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
atomic_sub_fetch_64
(
&
pDispatcher
->
cachedSize
,
pEntry
->
dataLen
);
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
a5165ab5
...
...
@@ -373,7 +373,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
pRsp
->
useconds
=
htobe64
(
pRsp
->
useconds
);
pRsp
->
numOfBlocks
=
htonl
(
pRsp
->
numOfBlocks
);
ASSERT
(
pRsp
!=
NULL
);
qDebug
(
"%s fetch rsp received, index:%d, blocks:%d, rows:%"
PRId64
", %p"
,
pSourceDataInfo
->
taskId
,
index
,
pRsp
->
numOfBlocks
,
pRsp
->
numOfRows
,
pExchangeInfo
);
}
else
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
a5165ab5
...
...
@@ -104,8 +104,6 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock,
void
setOperatorCompleted
(
SOperatorInfo
*
pOperator
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
ASSERT
(
pOperator
->
pTaskInfo
!=
NULL
);
pOperator
->
cost
.
totalCost
=
(
taosGetTimestampUs
()
-
pOperator
->
pTaskInfo
->
cost
.
start
)
/
1000
.
0
;
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
}
...
...
@@ -524,7 +522,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return
true
;
}
static
int32_t
doCreateConstantValColumn
Agg
Info
(
SInputColumnInfoData
*
pInput
,
SFunctParam
*
pFuncParam
,
int32_t
type
,
static
int32_t
doCreateConstantValColumn
SMA
Info
(
SInputColumnInfoData
*
pInput
,
SFunctParam
*
pFuncParam
,
int32_t
type
,
int32_t
paramIndex
,
int32_t
numOfRows
)
{
if
(
pInput
->
pData
[
paramIndex
]
==
NULL
)
{
pInput
->
pData
[
paramIndex
]
=
taosMemoryCalloc
(
1
,
sizeof
(
SColumnInfoData
));
...
...
@@ -548,8 +546,6 @@ static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SF
da
=
pInput
->
pColumnDataAgg
[
paramIndex
];
}
ASSERT
(
!
IS_VAR_DATA_TYPE
(
type
));
if
(
type
==
TSDB_DATA_TYPE_BIGINT
)
{
int64_t
v
=
pFuncParam
->
param
.
i
;
*
da
=
(
SColumnDataAgg
){.
numOfNull
=
0
,
.
min
=
v
,
.
max
=
v
,
.
sum
=
v
*
numOfRows
};
...
...
@@ -570,7 +566,7 @@ static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SF
}
else
if
(
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
// do nothing
}
else
{
ASSERT
(
0
);
qError
(
"invalid constant type for sma info"
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -600,7 +596,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB
// the data in the corresponding SColumnInfoData will not be used.
pInput
->
pData
[
j
]
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
}
else
if
(
pFuncParam
->
type
==
FUNC_PARAM_TYPE_VALUE
)
{
doCreateConstantValColumn
Agg
Info
(
pInput
,
pFuncParam
,
pFuncParam
->
param
.
nType
,
j
,
pBlock
->
info
.
rows
);
doCreateConstantValColumn
SMA
Info
(
pInput
,
pFuncParam
,
pFuncParam
->
param
.
nType
,
j
,
pBlock
->
info
.
rows
);
}
}
}
else
{
...
...
@@ -2217,13 +2213,11 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
*
ppNode
=
(
STableScanPhysiNode
*
)
pNode
;
return
0
;
}
else
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_APP_ERROR
;
return
-
1
;
}
}
else
{
if
(
LIST_LENGTH
(
pNode
->
pChildren
)
!=
1
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_APP_ERROR
;
return
-
1
;
}
...
...
@@ -2233,32 +2227,6 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
return
-
1
;
}
#if 0
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
STableScanInfo* pTableScanInfo = NULL;
if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
return -1;
}
STableScanPhysiNode* pNode = NULL;
if (extractTableScanNode(plan->pNode, &pNode) < 0) {
ASSERT(0);
}
tsdbReaderClose(pTableScanInfo->dataReader);
STableListInfo info = {0};
pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
if (pTableScanInfo->dataReader == NULL) {
ASSERT(0);
qError("failed to create data reader");
return TSDB_CODE_APP_ERROR;
}
// TODO: set uid and ts to data reader
return 0;
}
#endif
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
)
{
SExecTaskInfo
*
pTask
=
*
(
SExecTaskInfo
**
)
pTaskInfo
;
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
a5165ab5
...
...
@@ -42,38 +42,40 @@ typedef struct SJoinOperatorInfo {
static
void
setJoinColumnInfo
(
SColumnInfo
*
pColumn
,
const
SColumnNode
*
pColumnNode
);
static
SSDataBlock
*
doMergeJoin
(
struct
SOperatorInfo
*
pOperator
);
static
void
destroyMergeJoinOperator
(
void
*
param
);
static
void
extractTimeCondition
(
SJoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstream
,
int32_t
num
OfDownstream
,
SSortMergeJoinPhysiNode
*
pJoinNode
);
static
void
extractTimeCondition
(
SJoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstream
,
int32_t
num
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
const
char
*
idStr
);
static
void
extractTimeCondition
(
SJoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstrea
m
,
SSortMergeJoinPhysiNode
*
pJoinNode
)
{
static
void
extractTimeCondition
(
SJoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstream
,
int32_t
nu
m
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
const
char
*
idStr
)
{
SNode
*
pMergeCondition
=
pJoinNode
->
pMergeCondition
;
if
(
nodeType
(
pMergeCondition
)
==
QUERY_NODE_OPERATOR
)
{
SOperatorNode
*
pNode
=
(
SOperatorNode
*
)
pMergeCondition
;
SColumnNode
*
col1
=
(
SColumnNode
*
)
pNode
->
pLeft
;
SColumnNode
*
col2
=
(
SColumnNode
*
)
pNode
->
pRight
;
SColumnNode
*
leftTsCol
=
NULL
;
SColumnNode
*
rightTsCol
=
NULL
;
if
(
col1
->
dataBlockId
==
col2
->
dataBlockId
)
{
if
(
nodeType
(
pMergeCondition
)
!=
QUERY_NODE_OPERATOR
)
{
qError
(
"not support this in join operator, %s"
,
idStr
);
return
;
// do not handle this
}
SOperatorNode
*
pNode
=
(
SOperatorNode
*
)
pMergeCondition
;
SColumnNode
*
col1
=
(
SColumnNode
*
)
pNode
->
pLeft
;
SColumnNode
*
col2
=
(
SColumnNode
*
)
pNode
->
pRight
;
SColumnNode
*
leftTsCol
=
NULL
;
SColumnNode
*
rightTsCol
=
NULL
;
if
(
col1
->
dataBlockId
==
col2
->
dataBlockId
)
{
leftTsCol
=
col1
;
rightTsCol
=
col2
;
}
else
{
if
(
col1
->
dataBlockId
==
pDownstream
[
0
]
->
resultDataBlockId
)
{
ASSERT
(
col2
->
dataBlockId
==
pDownstream
[
1
]
->
resultDataBlockId
);
leftTsCol
=
col1
;
rightTsCol
=
col2
;
}
else
{
if
(
col1
->
dataBlockId
==
pDownstream
[
0
]
->
resultDataBlockId
)
{
ASSERT
(
col2
->
dataBlockId
==
pDownstream
[
1
]
->
resultDataBlockId
);
leftTsCol
=
col1
;
rightTsCol
=
col2
;
}
else
{
ASSERT
(
col1
->
dataBlockId
==
pDownstream
[
1
]
->
resultDataBlockId
);
ASSERT
(
col2
->
dataBlockId
==
pDownstream
[
0
]
->
resultDataBlockId
);
leftTsCol
=
col2
;
rightTsCol
=
col1
;
}
ASSERT
(
col1
->
dataBlockId
==
pDownstream
[
1
]
->
resultDataBlockId
);
ASSERT
(
col2
->
dataBlockId
==
pDownstream
[
0
]
->
resultDataBlockId
);
leftTsCol
=
col2
;
rightTsCol
=
col1
;
}
setJoinColumnInfo
(
&
pInfo
->
leftCol
,
leftTsCol
);
setJoinColumnInfo
(
&
pInfo
->
rightCol
,
rightTsCol
);
}
else
{
ASSERT
(
false
);
}}
}
setJoinColumnInfo
(
&
pInfo
->
leftCol
,
leftTsCol
);
setJoinColumnInfo
(
&
pInfo
->
rightCol
,
rightTsCol
);
}
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
@@ -97,7 +99,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
extractTimeCondition
(
pInfo
,
pDownstream
,
numOfDownstream
,
pJoinNode
);
extractTimeCondition
(
pInfo
,
pDownstream
,
numOfDownstream
,
pJoinNode
,
GET_TASKID
(
pTaskInfo
)
);
if
(
pJoinNode
->
pOnConditions
!=
NULL
&&
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
pInfo
->
pCondAfterMerge
=
nodesMakeNode
(
QUERY_NODE_LOGIC_CONDITION
);
...
...
@@ -364,8 +366,6 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
char
*
pRightVal
=
colDataGetData
(
pRightCol
,
pJoinInfo
->
rightPos
);
*
pRightTs
=
*
(
int64_t
*
)
pRightVal
;
ASSERT
(
pLeftCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
ASSERT
(
pRightCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
return
true
;
}
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
a5165ab5
...
...
@@ -139,7 +139,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
int32_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchItem
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
p
->
pDataBlock
,
pmInfo
->
srcSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
dstSlotId
);
...
...
@@ -272,7 +271,6 @@ void destroySortOperatorInfo(void* param) {
}
int32_t
getExplainExecInfo
(
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
)
{
ASSERT
(
pOptr
!=
NULL
);
SSortExecInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortExecInfo
));
SSortOperatorInfo
*
pOperatorInfo
=
(
SSortOperatorInfo
*
)
pOptr
->
info
;
...
...
@@ -329,7 +327,6 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
int32_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchItem
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
p
->
pDataBlock
,
pmInfo
->
srcSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
dstSlotId
);
...
...
@@ -746,7 +743,6 @@ void destroyMultiwayMergeOperatorInfo(void* param) {
}
int32_t
getMultiwayMergeExplainExecInfo
(
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
)
{
ASSERT
(
pOptr
!=
NULL
);
SSortExecInfo
*
pSortExecInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortExecInfo
));
SMultiwayMergeOperatorInfo
*
pInfo
=
(
SMultiwayMergeOperatorInfo
*
)
pOptr
->
info
;
...
...
source/libs/executor/src/tsimplehash.c
浏览文件 @
a5165ab5
...
...
@@ -49,7 +49,9 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
}
SSHashObj
*
tSimpleHashInit
(
size_t
capacity
,
_hash_fn_t
fn
)
{
ASSERT
(
fn
!=
NULL
);
if
(
fn
==
NULL
)
{
return
NULL
;
}
if
(
capacity
==
0
)
{
capacity
=
4
;
...
...
@@ -66,7 +68,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
pHashObj
->
equalFp
=
memcmp
;
pHashObj
->
hashFp
=
fn
;
ASSERT
((
pHashObj
->
capacity
&
(
pHashObj
->
capacity
-
1
))
==
0
);
pHashObj
->
hashList
=
(
SHNode
**
)
taosMemoryCalloc
(
pHashObj
->
capacity
,
sizeof
(
void
*
));
if
(
!
pHashObj
->
hashList
)
{
...
...
source/libs/executor/src/tsort.c
浏览文件 @
a5165ab5
...
...
@@ -800,6 +800,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
}
}
// all sources are completed.
if
(
pHandle
->
cmpParam
.
numOfSources
==
pHandle
->
numOfCompletedSources
)
{
return
NULL
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录