Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cdbfe53c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
cdbfe53c
编写于
4月 26, 2022
作者:
H
Haojun Liao
提交者:
GitHub
4月 26, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11884 from taosdata/feature/3.0_liaohj
fix(query): add an null ptr check
上级
f7cd02a9
3461d97b
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
275 addition
and
218 deletion
+275
-218
include/common/tcommon.h
include/common/tcommon.h
+9
-1
include/libs/function/function.h
include/libs/function/function.h
+1
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+5
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-12
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+31
-49
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+16
-10
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+149
-85
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+61
-59
未找到文件。
include/common/tcommon.h
浏览文件 @
cdbfe53c
...
...
@@ -99,6 +99,15 @@ typedef struct SColumnInfoData {
};
}
SColumnInfoData
;
typedef
struct
SQueryTableDataCond
{
STimeWindow
twindow
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
}
SQueryTableDataCond
;
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
int32_t
tEncodeDataBlock
(
void
**
buf
,
const
SSDataBlock
*
pBlock
);
void
*
tDecodeDataBlock
(
const
void
*
buf
,
SSDataBlock
*
pBlock
);
...
...
@@ -229,7 +238,6 @@ typedef struct SResSchame {
char
name
[
TSDB_COL_NAME_LEN
];
}
SResSchema
;
// TODO move away to executor.h
typedef
struct
SExprBasicInfo
{
SResSchema
resSchema
;
int16_t
numOfParams
;
// argument value of each function
...
...
include/libs/function/function.h
浏览文件 @
cdbfe53c
...
...
@@ -202,7 +202,7 @@ typedef struct SqlFunctionCtx {
SPoint1
end
;
SFuncExecFuncs
fpSet
;
SScalarFuncExecFuncs
sfp
;
SExprInfo
*
pExpr
;
struct
SExprInfo
*
pExpr
;
struct
SDiskbasedBuf
*
pBuf
;
struct
SSDataBlock
*
pSrcBlock
;
int32_t
curBufPage
;
...
...
source/common/src/tdatablock.c
浏览文件 @
cdbfe53c
...
...
@@ -79,7 +79,11 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
return
pColumnInfoData
->
varmeta
.
length
;
}
else
{
return
pColumnInfoData
->
info
.
bytes
*
numOfRows
;
if
(
pColumnInfoData
->
info
.
type
==
TSDB_DATA_TYPE_NULL
)
{
return
0
;
}
else
{
return
pColumnInfoData
->
info
.
bytes
*
numOfRows
;
}
}
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
cdbfe53c
...
...
@@ -80,16 +80,15 @@ char *metaTbCursorNext(SMTbCursor *pTbCur);
// tsdb
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdbQueryCond
STsdbQueryCond
;
typedef
void
*
tsdbReaderT
;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
S
TsdbQuery
Cond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
S
QueryTableData
Cond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
uint64_t
taskId
);
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
S
TsdbQuery
Cond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
S
QueryTableData
Cond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
pReader
,
STableBlockDistInfo
*
pTableBlockInfo
);
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
pReader
);
...
...
@@ -101,6 +100,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void
tsdbRetrieveDataBlockInfo
(
tsdbReaderT
*
pTsdbReadHandle
,
SDataBlockInfo
*
pBlockInfo
);
int32_t
tsdbRetrieveDataBlockStatisInfo
(
tsdbReaderT
*
pTsdbReadHandle
,
SColumnDataAgg
**
pBlockStatis
);
SArray
*
tsdbRetrieveDataBlock
(
tsdbReaderT
*
pTsdbReadHandle
,
SArray
*
pColumnIdList
);
void
tsdbResetReadHandle
(
tsdbReaderT
queryHandle
,
SQueryTableDataCond
*
pCond
);
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
...
...
@@ -160,15 +160,6 @@ struct SVnodeCfg {
int8_t
hashMethod
;
};
struct
STsdbQueryCond
{
STimeWindow
twindow
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
};
typedef
struct
{
TSKEY
lastKey
;
uint64_t
uid
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
cdbfe53c
...
...
@@ -254,7 +254,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
assert
(
info
.
lastKey
>=
pTsdbReadHandle
->
window
.
skey
&&
info
.
lastKey
<=
pTsdbReadHandle
->
window
.
ekey
);
}
else
{
assert
(
info
.
lastKey
>=
pTsdbReadHandle
->
window
.
ekey
&&
info
.
lastKey
<=
pTsdbReadHandle
->
window
.
skey
)
;
info
.
lastKey
=
pTsdbReadHandle
->
window
.
skey
;
}
taosArrayPush
(
pTableCheckInfo
,
&
info
);
...
...
@@ -317,7 +317,7 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
return
now
-
(
tsTickPerDay
[
pCfg
->
precision
]
*
pCfg
->
keep2
)
+
1
;
// needs to add one tick
}
static
void
setQueryTimewindow
(
STsdbReadHandle
*
pTsdbReadHandle
,
S
TsdbQuery
Cond
*
pCond
)
{
static
void
setQueryTimewindow
(
STsdbReadHandle
*
pTsdbReadHandle
,
S
QueryTableData
Cond
*
pCond
)
{
pTsdbReadHandle
->
window
=
pCond
->
twindow
;
bool
updateTs
=
false
;
...
...
@@ -343,7 +343,7 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond*
}
}
static
STsdbReadHandle
*
tsdbQueryTablesImpl
(
STsdb
*
tsdb
,
S
TsdbQuery
Cond
*
pCond
,
uint64_t
qId
,
uint64_t
taskId
)
{
static
STsdbReadHandle
*
tsdbQueryTablesImpl
(
STsdb
*
tsdb
,
S
QueryTableData
Cond
*
pCond
,
uint64_t
qId
,
uint64_t
taskId
)
{
STsdbReadHandle
*
pReadHandle
=
taosMemoryCalloc
(
1
,
sizeof
(
STsdbReadHandle
));
if
(
pReadHandle
==
NULL
)
{
goto
_end
;
...
...
@@ -422,7 +422,7 @@ _end:
return
NULL
;
}
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
S
TsdbQuery
Cond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
S
QueryTableData
Cond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
uint64_t
taskId
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
tsdbQueryTablesImpl
(
tsdb
,
pCond
,
qId
,
taskId
);
if
(
pTsdbReadHandle
==
NULL
)
{
...
...
@@ -448,7 +448,7 @@ tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo
return
(
tsdbReaderT
)
pTsdbReadHandle
;
}
void
tsdbReset
QueryHandle
(
tsdbReaderT
queryHandle
,
STsdbQuery
Cond
*
pCond
)
{
void
tsdbReset
ReadHandle
(
tsdbReaderT
queryHandle
,
SQueryTableData
Cond
*
pCond
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
queryHandle
;
if
(
emptyQueryTimewindow
(
pTsdbReadHandle
))
{
...
...
@@ -460,9 +460,9 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
return
;
}
pTsdbReadHandle
->
order
=
pCond
->
order
;
pTsdbReadHandle
->
window
=
pCond
->
twindow
;
pTsdbReadHandle
->
type
=
TSDB_QUERY_TYPE_ALL
;
pTsdbReadHandle
->
order
=
pCond
->
order
;
pTsdbReadHandle
->
window
=
pCond
->
twindow
;
pTsdbReadHandle
->
type
=
TSDB_QUERY_TYPE_ALL
;
pTsdbReadHandle
->
cur
.
fid
=
-
1
;
pTsdbReadHandle
->
cur
.
win
=
TSWINDOW_INITIALIZER
;
pTsdbReadHandle
->
checkFiles
=
true
;
...
...
@@ -485,7 +485,7 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
resetCheckInfo
(
pTsdbReadHandle
);
}
void
tsdbResetQueryHandleForNewTable
(
tsdbReaderT
queryHandle
,
S
TsdbQuery
Cond
*
pCond
,
STableGroupInfo
*
groupList
)
{
void
tsdbResetQueryHandleForNewTable
(
tsdbReaderT
queryHandle
,
S
QueryTableData
Cond
*
pCond
,
STableGroupInfo
*
groupList
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
queryHandle
;
pTsdbReadHandle
->
order
=
pCond
->
order
;
...
...
@@ -526,7 +526,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pC
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
}
tsdbReaderT
tsdbQueryLastRow
(
STsdb
*
tsdb
,
S
TsdbQuery
Cond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
tsdbQueryLastRow
(
STsdb
*
tsdb
,
S
QueryTableData
Cond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
uint64_t
taskId
)
{
pCond
->
twindow
=
updateLastrowForEachGroup
(
groupList
);
...
...
@@ -555,7 +555,7 @@ tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo
}
#if 0
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, S
TsdbQuery
Cond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, S
QueryTableData
Cond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pTsdbReadHandle == NULL) {
return NULL;
...
...
@@ -618,7 +618,7 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
return
pNew
;
}
tsdbReaderT
tsdbQueryRowsInExternalWindow
(
STsdb
*
tsdb
,
S
TsdbQuery
Cond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
tsdbQueryRowsInExternalWindow
(
STsdb
*
tsdb
,
S
QueryTableData
Cond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
uint64_t
taskId
)
{
STableGroupInfo
*
pNew
=
trimTableGroup
(
&
pCond
->
twindow
,
groupList
);
...
...
@@ -1185,10 +1185,11 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
tsdbDebug
(
"%p no data in mem, %s"
,
pTsdbReadHandle
,
pTsdbReadHandle
->
idStr
);
}
if
((
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<=
binfo
.
window
.
ekey
))
||
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>=
binfo
.
window
.
skey
)))
{
if
((
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<
binfo
.
window
.
skey
))
||
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>
binfo
.
window
.
ekey
)))
{
bool
ascScan
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
);
if
((
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<=
binfo
.
window
.
ekey
))
||
(
!
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>=
binfo
.
window
.
skey
)))
{
if
((
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<
binfo
.
window
.
skey
))
||
(
!
ascScan
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>
binfo
.
window
.
ekey
)))
{
// do not load file block into buffer
int32_t
step
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
1
:
-
1
;
...
...
@@ -1225,8 +1226,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
assert
(
pTsdbReadHandle
->
outputCapacity
>=
binfo
.
rows
);
int32_t
endPos
=
getEndPosInDataBlock
(
pTsdbReadHandle
,
&
binfo
);
if
((
cur
->
pos
==
0
&&
endPos
==
binfo
.
rows
-
1
&&
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
||
(
cur
->
pos
==
(
binfo
.
rows
-
1
)
&&
endPos
==
0
&&
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))))
{
if
((
cur
->
pos
==
0
&&
endPos
==
binfo
.
rows
-
1
&&
ascScan
)
||
(
cur
->
pos
==
(
binfo
.
rows
-
1
)
&&
endPos
==
0
&&
(
!
ascScan
)))
{
pTsdbReadHandle
->
realNumOfRows
=
binfo
.
rows
;
cur
->
rows
=
binfo
.
rows
;
...
...
@@ -1234,7 +1234,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
cur
->
mixBlock
=
false
;
cur
->
blockCompleted
=
true
;
if
(
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
)
{
if
(
ascScan
)
{
cur
->
lastKey
=
binfo
.
window
.
ekey
+
1
;
cur
->
pos
=
binfo
.
rows
;
}
else
{
...
...
@@ -1382,8 +1382,6 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
static
int32_t
doCopyRowsFromFileBlock
(
STsdbReadHandle
*
pTsdbReadHandle
,
int32_t
capacity
,
int32_t
numOfRows
,
int32_t
start
,
int32_t
end
)
{
int32_t
step
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
1
:
-
1
;
SDataCols
*
pCols
=
pTsdbReadHandle
->
rhelper
.
pDCols
[
0
];
TSKEY
*
tsArray
=
pCols
->
cols
[
0
].
pData
;
...
...
@@ -1394,6 +1392,11 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
return
numOfRows
;
}
bool
ascScan
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
);
int32_t
trueStart
=
ascScan
?
start
:
end
;
int32_t
trueEnd
=
ascScan
?
end
:
start
;
int32_t
step
=
ascScan
?
1
:
-
1
;
int32_t
requiredNumOfCols
=
(
int32_t
)
taosArrayGetSize
(
pTsdbReadHandle
->
pColumns
);
// data in buffer has greater timestamp, copy data in file block
...
...
@@ -1411,7 +1414,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
if
(
!
IS_VAR_DATA_TYPE
(
pColInfo
->
info
.
type
))
{
// todo opt performance
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
int32_t
rowIndex
=
numOfRows
;
for
(
int32_t
k
=
start
;
k
<=
end
;
++
k
,
++
rowIndex
)
{
for
(
int32_t
k
=
trueStart
;
((
ascScan
&&
k
<=
trueEnd
)
||
(
!
ascScan
&&
k
>=
trueEnd
));
k
+=
step
,
++
rowIndex
)
{
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
src
,
k
,
pCols
->
bitmapMode
)
<
0
)
{
TASSERT
(
0
);
...
...
@@ -1427,7 +1430,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
int32_t
rowIndex
=
numOfRows
;
// todo refactor, only copy one-by-one
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
,
++
rowIndex
)
{
for
(
int32_t
k
=
trueStart
;
((
ascScan
&&
k
<=
trueEnd
)
||
(
!
ascScan
&&
k
>=
trueEnd
));
k
+=
step
,
++
rowIndex
)
{
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
src
,
k
,
pCols
->
bitmapMode
)
<
0
)
{
TASSERT
(
0
);
...
...
@@ -1444,26 +1447,19 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
j
++
;
i
++
;
}
else
{
// pColInfo->info.colId < src->colId, it is a NULL data
int32_t
rowIndex
=
numOfRows
;
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
,
++
rowIndex
)
{
// TODO opt performance
colDataAppend
(
pColInfo
,
rowIndex
,
NULL
,
true
);
}
colDataAppendNNULL
(
pColInfo
,
numOfRows
,
num
);
i
++
;
}
}
while
(
i
<
requiredNumOfCols
)
{
// the remain columns are all null data
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pColumns
,
i
);
int32_t
rowIndex
=
numOfRows
;
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
,
++
rowIndex
)
{
colDataAppend
(
pColInfo
,
rowIndex
,
NULL
,
true
);
// TODO add a fast version to set a number of consecutive NULL value.
}
colDataAppendNNULL
(
pColInfo
,
numOfRows
,
num
);
i
++
;
}
pTsdbReadHandle
->
cur
.
win
.
ekey
=
tsArray
[
e
nd
];
pTsdbReadHandle
->
cur
.
lastKey
=
tsArray
[
e
nd
]
+
step
;
pTsdbReadHandle
->
cur
.
win
.
ekey
=
tsArray
[
trueE
nd
];
pTsdbReadHandle
->
cur
.
lastKey
=
tsArray
[
trueE
nd
]
+
step
;
return
numOfRows
+
num
;
}
...
...
@@ -2966,7 +2962,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) {
// }
//
// // load the previous row
// S
TsdbQuery
Cond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
// S
QueryTableData
Cond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
// if (type == TSDB_PREV_ROW) {
// cond.order = TSDB_ORDER_DESC;
// cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
...
...
@@ -3330,21 +3326,7 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
return
NULL
;
}
// todo refactor
int32_t
numOfRows
=
doCopyRowsFromFileBlock
(
pHandle
,
pHandle
->
outputCapacity
,
0
,
0
,
pBlock
->
numOfRows
-
1
);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if
(
!
ASCENDING_TRAVERSE
(
pHandle
->
order
)
&&
numOfRows
<
pHandle
->
outputCapacity
)
{
int32_t
emptySize
=
pHandle
->
outputCapacity
-
numOfRows
;
int32_t
reqNumOfCols
=
(
int32_t
)
taosArrayGetSize
(
pHandle
->
pColumns
);
for
(
int32_t
i
=
0
;
i
<
reqNumOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pHandle
->
pColumns
,
i
);
memmove
((
char
*
)
pColInfo
->
pData
,
(
char
*
)
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
}
}
return
pHandle
->
pColumns
;
}
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
cdbfe53c
...
...
@@ -319,27 +319,32 @@ typedef struct SColMatchInfo {
bool
output
;
}
SColMatchInfo
;
typedef
struct
SScanInfo
{
int32_t
numOfAsc
;
int32_t
numOfDesc
;
}
SScanInfo
;
typedef
struct
STableScanInfo
{
void
*
dataReader
;
int32_t
numOfBlocks
;
// extract basic running information.
int32_t
numOfSkipped
;
int32_t
numOfBlockStatis
;
int64_t
numOfRows
;
int32_t
order
;
// scan order
int32_t
times
;
// repeat counts
int64_t
elapsedTime
;
int32_t
prevGroupId
;
// previous table group id
SScanInfo
scanInfo
;
int32_t
current
;
int32_t
reverseTimes
;
// 0 by default
SNode
*
pFilterNode
;
// filter operator info
SqlFunctionCtx
*
pCtx
;
// next operator query context
SNode
*
pFilterNode
;
// filter operator info
SqlFunctionCtx
*
pCtx
;
// next operator query context
SResultRowInfo
*
pResultRowInfo
;
int32_t
*
rowCellInfoOffset
;
SExprInfo
*
pExpr
;
SSDataBlock
*
pResBlock
;
SArray
*
pColMatchInfo
;
int32_t
numOfOutput
;
int64_t
elapsedTime
;
int32_t
prevGroupId
;
// previous table group id
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
double
sampleRatio
;
// data block sample ratio, 1 by default
...
...
@@ -620,6 +625,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
);
void
getAlignQueryTimeWindow
(
SInterval
*
pInterval
,
int32_t
precision
,
int64_t
key
,
STimeWindow
*
win
);
int32_t
getTableScanOrder
(
SOperatorInfo
*
pOperator
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
...
...
@@ -627,9 +633,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
p
ReaderHandle
,
int32_t
order
,
int32_t
numOfCols
,
int32_t
dataLoadFlag
,
int32_t
repeatTime
,
int32_t
reverseTime
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SInterval
*
pInterval
,
double
ratio
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
p
DataReader
,
SQueryTableDataCond
*
pCond
,
int32_t
numOfOutput
,
int32_t
dataLoadFlag
,
const
uint8_t
*
scanInfo
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SInterval
*
pInterval
,
double
sampleRatio
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
cdbfe53c
...
...
@@ -191,7 +191,7 @@ static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutpu
static
int32_t
setTimestampListJoinInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SVariant
*
pTag
,
STableQueryInfo
*
pTableQueryInfo
);
static
void
releaseQueryBuf
(
size_t
numOfTables
);
static
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
// static S
TsdbQuery
Cond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
// static S
QueryTableData
Cond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
static
STableIdInfo
createTableIdInfo
(
STableQueryInfo
*
pTableQueryInfo
);
static
int32_t
getNumOfScanTimes
(
STaskAttr
*
pQueryAttr
);
...
...
@@ -207,7 +207,6 @@ static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static
void
destroyIntervalOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyExchangeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyConditionOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyOperatorInfo
(
SOperatorInfo
*
pOperator
);
static
void
destroySysTableScannerOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
...
...
@@ -3588,8 +3587,8 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
#endif
}
// S
TsdbQuery
Cond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) {
// S
TsdbQuery
Cond cond = {
// S
QueryTableData
Cond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) {
// S
QueryTableData
Cond cond = {
// .colList = pQueryAttr->tableCols,
// .order = pQueryAttr->order.order,
// .numOfCols = pQueryAttr->numOfCols,
...
...
@@ -4681,7 +4680,18 @@ _error:
return
NULL
;
}
static
int32_t
getTableScanOrder
(
STableScanInfo
*
pTableScanInfo
)
{
return
pTableScanInfo
->
order
;
}
int32_t
getTableScanOrder
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
if
(
pOperator
->
pDownstream
==
NULL
||
pOperator
->
pDownstream
[
0
]
==
NULL
)
{
return
TSDB_ORDER_ASC
;
}
else
{
return
getTableScanOrder
(
pOperator
->
pDownstream
[
0
]);
}
}
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
return
pTableScanInfo
->
cond
.
order
;
}
// this is a blocking operator
static
int32_t
doOpenAggregateOptr
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -4885,6 +4895,76 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
return
true
;
}
enum
{
PROJECT_RETRIEVE_CONTINUE
=
0x1
,
PROJECT_RETRIEVE_DONE
=
0x2
,
};
static
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
SProjectOperatorInfo
*
pProjectInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pInfo
=
&
pProjectInfo
->
binfo
;
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
if
(
pProjectInfo
->
curSOffset
>
0
)
{
if
(
pProjectInfo
->
groupId
==
0
)
{
// it is the first group
pProjectInfo
->
groupId
=
pBlock
->
info
.
groupId
;
blockDataCleanup
(
pInfo
->
pRes
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pProjectInfo
->
groupId
!=
pBlock
->
info
.
groupId
)
{
pProjectInfo
->
curSOffset
-=
1
;
// ignore data block in current group
if
(
pProjectInfo
->
curSOffset
>
0
)
{
blockDataCleanup
(
pInfo
->
pRes
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
// set current group id of the project operator
pProjectInfo
->
groupId
=
pBlock
->
info
.
groupId
;
}
if
(
pProjectInfo
->
groupId
!=
0
&&
pProjectInfo
->
groupId
!=
pBlock
->
info
.
groupId
)
{
pProjectInfo
->
curGroupOutput
+=
1
;
if
((
pProjectInfo
->
slimit
.
limit
>
0
)
&&
(
pProjectInfo
->
slimit
.
limit
<=
pProjectInfo
->
curGroupOutput
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
blockDataCleanup
(
pRes
);
return
PROJECT_RETRIEVE_DONE
;
}
// reset the value for a new group data
pProjectInfo
->
curOffset
=
0
;
pProjectInfo
->
curOutput
=
0
;
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pProjectInfo
->
groupId
=
pBlock
->
info
.
groupId
;
if
(
pProjectInfo
->
curOffset
>=
pRes
->
info
.
rows
)
{
pProjectInfo
->
curOffset
-=
pRes
->
info
.
rows
;
blockDataCleanup
(
pRes
);
return
PROJECT_RETRIEVE_CONTINUE
;
}
else
if
(
pProjectInfo
->
curOffset
<
pRes
->
info
.
rows
&&
pProjectInfo
->
curOffset
>
0
)
{
blockDataTrimFirstNRows
(
pRes
,
pProjectInfo
->
curOffset
);
pProjectInfo
->
curOffset
=
0
;
}
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
// check for the limitation in each group
if
(
pProjectInfo
->
limit
.
limit
>
0
&&
pProjectInfo
->
curOutput
+
pRes
->
info
.
rows
>=
pProjectInfo
->
limit
.
limit
)
{
pRes
->
info
.
rows
=
(
int32_t
)(
pProjectInfo
->
limit
.
limit
-
pProjectInfo
->
curOutput
);
}
return
PROJECT_RETRIEVE_DONE
;
}
else
{
// not full enough, continue to accumulate the output data in the buffer.
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
static
SSDataBlock
*
doProjectOperation
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
SProjectOperatorInfo
*
pProjectInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pInfo
=
&
pProjectInfo
->
binfo
;
...
...
@@ -4953,63 +5033,22 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
TSDB_ORDER_ASC
,
false
);
int32_t
order
=
getTableScanOrder
(
pOperator
->
pDownstream
[
0
]);
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfOutput
,
pProjectInfo
->
pPseudoColInfo
);
if
(
pProjectInfo
->
curSOffset
>
0
)
{
if
(
pProjectInfo
->
groupId
==
0
)
{
// it is the first group
pProjectInfo
->
groupId
=
pBlock
->
info
.
groupId
;
blockDataCleanup
(
pInfo
->
pRes
);
continue
;
}
else
if
(
pProjectInfo
->
groupId
!=
pBlock
->
info
.
groupId
)
{
pProjectInfo
->
curSOffset
-=
1
;
// ignore data block in current group
if
(
pProjectInfo
->
curSOffset
>
0
)
{
blockDataCleanup
(
pInfo
->
pRes
);
continue
;
}
}
pProjectInfo
->
groupId
=
pBlock
->
info
.
groupId
;
}
if
(
pProjectInfo
->
groupId
!=
0
&&
pProjectInfo
->
groupId
!=
pBlock
->
info
.
groupId
)
{
pProjectInfo
->
curGroupOutput
+=
1
;
if
((
pProjectInfo
->
slimit
.
limit
>
0
)
&&
(
pProjectInfo
->
slimit
.
limit
<=
pProjectInfo
->
curGroupOutput
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
return
NULL
;
}
// reset the value for a new group data
pProjectInfo
->
curOffset
=
0
;
pProjectInfo
->
curOutput
=
0
;
}
pProjectInfo
->
groupId
=
pBlock
->
info
.
groupId
;
// todo extract method
if
(
pProjectInfo
->
curOffset
<
pInfo
->
pRes
->
info
.
rows
&&
pProjectInfo
->
curOffset
>
0
)
{
blockDataTrimFirstNRows
(
pInfo
->
pRes
,
pProjectInfo
->
curOffset
);
pProjectInfo
->
curOffset
=
0
;
}
else
if
(
pProjectInfo
->
curOffset
>=
pInfo
->
pRes
->
info
.
rows
)
{
pProjectInfo
->
curOffset
-=
pInfo
->
pRes
->
info
.
rows
;
blockDataCleanup
(
pInfo
->
pRes
);
int32_t
status
=
handleLimitOffset
(
pOperator
,
pBlock
);
if
(
status
==
PROJECT_RETRIEVE_CONTINUE
)
{
continue
;
}
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
}
else
if
(
status
==
PROJECT_RETRIEVE_DONE
)
{
break
;
}
}
if
(
pProjectInfo
->
limit
.
limit
>
0
&&
pProjectInfo
->
curOutput
+
pInfo
->
pRes
->
info
.
rows
>=
pProjectInfo
->
limit
.
limit
)
{
pInfo
->
pRes
->
info
.
rows
=
(
int32_t
)(
pProjectInfo
->
limit
.
limit
-
pProjectInfo
->
curOutput
);
}
pProjectInfo
->
curOutput
+=
pInfo
->
pRes
->
info
.
rows
;
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
...
...
@@ -5661,8 +5700,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
)
{
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
)
{
SAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -6319,6 +6357,19 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
,
SNodeList
*
pNodeListTarget
);
static
SArray
*
createIndexMap
(
SNodeList
*
pNodeList
);
static
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
static
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
);
static
SInterval
extractIntervalInfo
(
const
STableScanPhysiNode
*
pTableScanNode
)
{
SInterval
interval
=
{
.
interval
=
pTableScanNode
->
interval
,
.
sliding
=
pTableScanNode
->
sliding
,
.
intervalUnit
=
pTableScanNode
->
intervalUnit
,
.
slidingUnit
=
pTableScanNode
->
slidingUnit
,
.
offset
=
pTableScanNode
->
offset
,
};
return
interval
;
}
SOperatorInfo
*
createOperatorTree
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableGroupInfo
*
pTableGroupInfo
)
{
...
...
@@ -6329,7 +6380,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
tsdbReaderT
pDataReader
=
doCreateDataReader
(
pTableScanNode
,
pHandle
,
pTableGroupInfo
,
(
uint64_t
)
queryId
,
taskId
);
if
(
pDataReader
==
NULL
&&
terrno
!=
0
)
{
return
NULL
;
...
...
@@ -6339,16 +6390,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pScanPhyNode
->
node
.
pOutputDataBlockDesc
,
&
numOfCols
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pScanPhyNode
->
node
.
pOutputDataBlockDesc
);
SInterval
interval
=
{
.
interval
=
pTableScanNode
->
interval
,
.
sliding
=
pTableScanNode
->
sliding
,
.
intervalUnit
=
pTableScanNode
->
intervalUnit
,
.
slidingUnit
=
pTableScanNode
->
slidingUnit
,
.
offset
=
pTableScanNode
->
offset
,
};
SQueryTableDataCond
cond
=
{
0
};
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
return
createTableScanOperatorInfo
(
pDataReader
,
pTableScanNode
->
scanSeq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
,
numOfCols
,
pTableScanNode
->
dataRequired
,
pTableScanNode
->
scanSeq
[
0
],
pTableScanNode
->
scanSeq
[
1
]
,
pColList
,
SInterval
interval
=
extractIntervalInfo
(
pTableScanNode
);
return
createTableScanOperatorInfo
(
pDataReader
,
&
cond
,
numOfCols
,
pTableScanNode
->
dataRequired
,
pTableScanNode
->
scanSeq
,
pColList
,
pResBlock
,
pScanPhyNode
->
node
.
pConditions
,
&
interval
,
pTableScanNode
->
ratio
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
type
)
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
pPhyNode
;
...
...
@@ -6369,10 +6418,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
SSystemTableScanPhysiNode
*
pSysScanPhyNode
=
(
SSystemTableScanPhysiNode
*
)
pPhyNode
;
SS
DataBlock
*
pResBlock
=
createResDataBlock
(
pSysScanPhyNode
->
scan
.
node
.
pOutputDataBlockDesc
)
;
SS
canPhysiNode
*
pScanNode
=
&
pSysScanPhyNode
->
scan
;
struct
SScanPhysiNode
*
pScanNode
=
&
pSysScanPhyNode
->
scan
;
SArray
*
colList
=
extractScanColumnId
(
pScanNode
->
pScanCols
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pScanNode
->
node
.
pOutputDataBlockDesc
)
;
SArray
*
colList
=
extractScanColumnId
(
pScanNode
->
pScanCols
);
SOperatorInfo
*
pOperator
=
createSysTableScanOperatorInfo
(
pHandle
->
meta
,
pResBlock
,
&
pScanNode
->
tableName
,
pScanNode
->
node
.
pConditions
,
pSysScanPhyNode
->
mgmtEpSet
,
...
...
@@ -6493,38 +6542,47 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return
pOptr
;
}
static
tsdbReaderT
createDataReaderImpl
(
STableScanPhysiNode
*
pTableScanNode
,
STableGroupInfo
*
pGroupInfo
,
void
*
readHandle
,
uint64_t
queryId
,
uint64_t
taskId
)
{
STsdbQueryCond
cond
=
{.
loadExternalRows
=
false
};
cond
.
order
=
pTableScanNode
->
scanSeq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
cond
.
numOfCols
=
LIST_LENGTH
(
pTableScanNode
->
scan
.
pScanCols
);
cond
.
colList
=
taosMemoryCalloc
(
cond
.
numOfCols
,
sizeof
(
SColumnInfo
));
if
(
cond
.
colList
==
NULL
)
{
static
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
)
{
pCond
->
loadExternalRows
=
false
;
pCond
->
order
=
pTableScanNode
->
scanSeq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pCond
->
numOfCols
=
LIST_LENGTH
(
pTableScanNode
->
scan
.
pScanCols
);
pCond
->
colList
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfo
));
if
(
pCond
->
colList
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
NULL
;
return
terrno
;
}
cond
.
twindow
=
pTableScanNode
->
scanRange
;
cond
.
type
=
BLOCK_LOAD_OFFSET_SEQ_ORDER
;
// cond.type = pTableScanNode->scanFlag;
pCond
->
twindow
=
pTableScanNode
->
scanRange
;
#if 1
//todo work around a problem, remove it later
if
((
pCond
->
order
==
TSDB_ORDER_ASC
&&
pCond
->
twindow
.
skey
>
pCond
->
twindow
.
ekey
)
||
(
pCond
->
order
==
TSDB_ORDER_DESC
&&
pCond
->
twindow
.
skey
<
pCond
->
twindow
.
ekey
))
{
TSWAP
(
pCond
->
twindow
.
skey
,
pCond
->
twindow
.
ekey
,
int64_t
);
}
#endif
pCond
->
type
=
BLOCK_LOAD_OFFSET_SEQ_ORDER
;
// pCond->type = pTableScanNode->scanFlag;
int32_t
j
=
0
;
for
(
int32_t
i
=
0
;
i
<
cond
.
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfCols
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pTableScanNode
->
scan
.
pScanCols
,
i
);
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
->
pExpr
;
if
(
pColNode
->
colType
==
COLUMN_TYPE_TAG
)
{
continue
;
}
cond
.
colList
[
j
].
type
=
pColNode
->
node
.
resType
.
type
;
cond
.
colList
[
j
].
bytes
=
pColNode
->
node
.
resType
.
bytes
;
cond
.
colList
[
j
].
colId
=
pColNode
->
colId
;
pCond
->
colList
[
j
].
type
=
pColNode
->
node
.
resType
.
type
;
pCond
->
colList
[
j
].
bytes
=
pColNode
->
node
.
resType
.
bytes
;
pCond
->
colList
[
j
].
colId
=
pColNode
->
colId
;
j
+=
1
;
}
cond
.
numOfCols
=
j
;
return
tsdbQueryTables
(
readHandle
,
&
cond
,
pGroupInfo
,
queryId
,
taskId
)
;
pCond
->
numOfCols
=
j
;
return
TSDB_CODE_SUCCESS
;
}
SArray
*
extractScanColumnId
(
SNodeList
*
pNodeList
)
{
...
...
@@ -6742,7 +6800,13 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto
_error
;
}
return
createDataReaderImpl
(
pTableScanNode
,
pTableGroupInfo
,
pHandle
->
reader
,
queryId
,
taskId
);
SQueryTableDataCond
cond
=
{
0
};
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
tsdbQueryTables
(
pHandle
->
reader
,
&
cond
,
pTableGroupInfo
,
queryId
,
taskId
);
_error:
terrno
=
code
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
cdbfe53c
...
...
@@ -245,6 +245,9 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
relocateColumnData
(
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
pCols
);
// reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream operator.
pBlock
->
info
.
blockId
=
0
;
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
);
if
(
pBlock
->
info
.
rows
==
0
)
{
pCost
->
filterOutBlocks
+=
1
;
...
...
@@ -255,17 +258,15 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
return
TSDB_CODE_SUCCESS
;
}
static
void
setupEnvForReverseScan
(
STableScanInfo
*
pTableScanInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
// reverse order time range
static
void
prepareForDescendingScan
(
STableScanInfo
*
pTableScanInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
SET_REVERSE_SCAN_FLAG
(
pTableScanInfo
);
switchCtxOrder
(
pCtx
,
numOfOutput
);
SWITCH_ORDER
(
pTableScanInfo
->
order
);
setupQueryRangeForReverseScan
(
pTableScanInfo
);
// setupQueryRangeForReverseScan(pTableScanInfo);
pTableScanInfo
->
times
=
1
;
pTableScanInfo
->
current
=
0
;
pTableScanInfo
->
reverseTimes
=
0
;
STimeWindow
*
pTWindow
=
&
pTableScanInfo
->
cond
.
twindow
;
TSWAP
(
pTWindow
->
skey
,
pTWindow
->
ekey
,
int64_t
)
;
pTableScanInfo
->
cond
.
order
=
TSDB_ORDER_DESC
;
}
static
SSDataBlock
*
doTableScanImpl
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
...
...
@@ -294,8 +295,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
continue
;
}
// reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream operator.
pBlock
->
info
.
blockId
=
0
;
return
pBlock
;
}
...
...
@@ -311,63 +310,67 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return
NULL
;
}
SResultRowInfo
*
pResultRowInfo
=
pTableScanInfo
->
pResultRowInfo
;
*
newgroup
=
false
;
while
(
pTableScanInfo
->
current
<
pTableScanInfo
->
times
)
{
while
(
pTableScanInfo
->
current
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
,
newgroup
);
if
(
p
!=
NULL
)
{
return
p
;
}
if
(
++
pTableScanInfo
->
current
>=
pTableScanInfo
->
times
)
{
if
(
pTableScanInfo
->
reverseTimes
<=
0
/* || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)*/
)
{
return
NULL
;
}
else
{
break
;
}
pTableScanInfo
->
current
+=
1
;
if
(
pTableScanInfo
->
current
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTableScanInfo
->
scanFlag
=
REPEAT_SCAN
;
STimeWindow
*
pWin
=
&
pTableScanInfo
->
cond
.
twindow
;
qDebug
(
"%s start to repeat ascending order scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
// do prepare for the next round table scan operation
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
}
}
// do prepare for the next round table scan operation
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
int32_t
total
=
pTableScanInfo
->
scanInfo
.
numOfAsc
+
pTableScanInfo
->
scanInfo
.
numOfDesc
;
if
(
pTableScanInfo
->
current
<
total
)
{
if
(
pTableScanInfo
->
cond
.
order
==
TSDB_ORDER_ASC
)
{
prepareForDescendingScan
(
pTableScanInfo
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
);
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
}
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTableScanInfo
->
scanFlag
=
REPEAT_SCAN
;
STimeWindow
*
pWin
=
&
pTableScanInfo
->
cond
.
twindow
;
qDebug
(
"%s start to descending order scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
// if (pResultRowInfo->size > 0) {
// pResultRowInfo->curPos = 0;
// }
while
(
pTableScanInfo
->
current
<
total
)
{
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
,
newgroup
);
if
(
p
!=
NULL
)
{
return
p
;
}
qDebug
(
"%s start to repeat scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
}
pTableScanInfo
->
current
+=
1
;
SSDataBlock
*
p
=
NULL
;
// todo refactor
if
(
pTableScanInfo
->
reverseTimes
>
0
)
{
setupEnvForReverseScan
(
pTableScanInfo
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
);
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
if
(
pTableScanInfo
->
current
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTableScanInfo
->
scanFlag
=
REPEAT_SCAN
;
qDebug
(
"%s start to reverse
scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
qDebug
(
"%s start to repeat descending order
scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
if
(
pResultRowInfo
->
size
>
0
)
{
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
// do prepare for the next round table scan operation
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
}
}
p
=
doTableScanImpl
(
pOperator
,
newgroup
);
}
return
p
;
setTaskStatus
(
pTaskInfo
,
TASK_COMPLETED
);
return
NULL
;
}
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pDataReader
,
int32_t
order
,
int32_t
numOfOutput
,
int32_t
dataLoadFlag
,
int32_t
repeatTime
,
int32_t
reverseTime
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SInterval
*
pInterval
,
double
sampleRatio
,
SExecTaskInfo
*
pTaskInfo
)
{
assert
(
repeatTime
>
0
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pDataReader
,
SQueryTableDataCond
*
pCond
,
int32_t
numOfOutput
,
int32_t
dataLoadFlag
,
const
uint8_t
*
scanInfo
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SInterval
*
pInterval
,
double
sampleRatio
,
SExecTaskInfo
*
pTaskInfo
)
{
STableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -378,18 +381,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
return
NULL
;
}
pInfo
->
cond
=
*
pCond
;
pInfo
->
scanInfo
=
(
SScanInfo
)
{.
numOfAsc
=
scanInfo
[
0
],
.
numOfDesc
=
scanInfo
[
1
]};
pInfo
->
interval
=
*
pInterval
;
pInfo
->
sampleRatio
=
sampleRatio
;
pInfo
->
dataBlockLoadFlag
=
dataLoadFlag
;
pInfo
->
pResBlock
=
pResBlock
;
pInfo
->
pFilterNode
=
pCondition
;
pInfo
->
dataReader
=
pDataReader
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
reverseTime
;
pInfo
->
order
=
order
;
pInfo
->
current
=
0
;
pInfo
->
scanFlag
=
MAIN_SCAN
;
pInfo
->
pColMatchInfo
=
pColMatchInfo
;
pOperator
->
name
=
"TableScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
pOperator
->
blockingOptr
=
false
;
...
...
@@ -410,19 +414,17 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
SOperatorInfo
*
createTableSeqScanOperatorInfo
(
void
*
pTsdbReadHandle
)
{
STableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableScanInfo
));
pInfo
->
dataReader
=
pTsdbReadHandle
;
pInfo
->
times
=
1
;
pInfo
->
reverseTimes
=
0
;
pInfo
->
current
=
0
;
pInfo
->
dataReader
=
pTsdbReadHandle
;
pInfo
->
current
=
0
;
pInfo
->
prevGroupId
=
-
1
;
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
getNextFn
=
doTableScanImpl
;
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
getNextFn
=
doTableScanImpl
;
return
pOperator
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录