Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
8114e1e7
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8114e1e7
编写于
7月 14, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-225] fix compiler error.
上级
621f842c
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
162 addition
and
95 deletion
+162
-95
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+20
-3
src/query/inc/tsqlfunction.h
src/query/inc/tsqlfunction.h
+1
-4
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+135
-76
src/query/src/qUtil.c
src/query/src/qUtil.c
+3
-10
src/query/src/qresultBuf.c
src/query/src/qresultBuf.c
+1
-1
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+2
-1
未找到文件。
src/query/inc/qUtil.h
浏览文件 @
8114e1e7
...
...
@@ -32,14 +32,31 @@ int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void
closeTimeWindow
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
);
void
closeAllTimeWindow
(
SWindowResInfo
*
pWindowResInfo
);
void
removeRedundantWindow
(
SWindowResInfo
*
pWindowResInfo
,
TSKEY
lastKey
,
int32_t
order
);
SWindowResult
*
getWindowResult
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
);
static
FORCE_INLINE
SWindowResult
*
getWindowResult
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
assert
(
pWindowResInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pWindowResInfo
->
size
);
return
&
pWindowResInfo
->
pResult
[
slot
];
}
#define curTimeWindow(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1)
bool
isWindowResClosed
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
);
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
SPosInfo
*
posInfo
,
size_t
interBufSize
);
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
size_t
interBufSize
);
//char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult);
static
FORCE_INLINE
char
*
getPosInResultPage
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
columnIndex
,
SWindowResult
*
pResult
)
{
assert
(
pResult
!=
NULL
&&
pRuntimeEnv
!=
NULL
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
tFilePage
*
page
=
GET_RES_BUF_PAGE_BY_ID
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pos
.
pageId
);
int32_t
realRowId
=
pResult
->
pos
.
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
);
char
*
getPosInResultPage
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
columnIndex
,
SWindowResult
*
pResult
);
return
((
char
*
)
page
->
data
)
+
pRuntimeEnv
->
offset
[
columnIndex
]
*
pRuntimeEnv
->
numOfRowsPerPage
+
pQuery
->
pSelectExpr
[
columnIndex
].
bytes
*
realRowId
;
}
__filter_func_t
*
getRangeFilterFuncArray
(
int32_t
type
);
__filter_func_t
*
getValueFilterFuncArray
(
int32_t
type
);
...
...
src/query/inc/tsqlfunction.h
浏览文件 @
8114e1e7
...
...
@@ -132,13 +132,10 @@ typedef struct SQLPreAggVal {
typedef
struct
SInterpInfoDetail
{
TSKEY
ts
;
// interp specified timestamp
int8_t
hasResult
;
int8_t
type
;
int8_t
primaryCol
;
}
SInterpInfoDetail
;
typedef
struct
SInterpInfo
{
SInterpInfoDetail
*
pInterpDetail
;
}
SInterpInfo
;
typedef
struct
SResultInfo
{
int8_t
hasResult
;
// result generated, not NULL value
bool
initialized
;
// output buffer has been initialized
...
...
@@ -146,7 +143,7 @@ typedef struct SResultInfo {
bool
superTableQ
;
// is super table query
int32_t
numOfRes
;
// num of output result in current buffer
int32_t
bufLen
;
// buffer size
void
*
interResultBuf
;
// output result buffer
void
*
interResultBuf
;
// output result buffer
}
SResultInfo
;
struct
SQLFunctionCtx
;
...
...
src/query/src/qExecutor.c
浏览文件 @
8114e1e7
...
...
@@ -361,10 +361,6 @@ static bool hasTagValOutput(SQuery* pQuery) {
* @return
*/
static
bool
hasNullValue
(
SColIndex
*
pColIndex
,
SDataStatis
*
pStatis
,
SDataStatis
**
pColStatis
)
{
if
(
TSDB_COL_IS_TAG
(
pColIndex
->
flag
)
||
pColIndex
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
false
;
}
if
(
pStatis
!=
NULL
)
{
*
pColStatis
=
&
pStatis
[
pColIndex
->
colIndex
];
assert
((
*
pColStatis
)
->
colId
==
pColIndex
->
colId
);
...
...
@@ -372,6 +368,10 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
*
pColStatis
=
NULL
;
}
if
(
TSDB_COL_IS_TAG
(
pColIndex
->
flag
)
||
pColIndex
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
false
;
}
if
((
*
pColStatis
)
!=
NULL
&&
(
*
pColStatis
)
->
numOfNull
==
0
)
{
return
false
;
}
...
...
@@ -387,31 +387,33 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
if
(
p1
!=
NULL
)
{
pWindowResInfo
->
curIndex
=
*
p1
;
}
else
{
if
(
masterscan
)
{
// more than the capacity, reallocate the resources
if
(
pWindowResInfo
->
size
>=
pWindowResInfo
->
capacity
)
{
int64_t
newCap
=
pWindowResInfo
->
capacity
*
2
;
char
*
t
=
realloc
(
pWindowResInfo
->
pResult
,
newCap
*
sizeof
(
SWindowResult
));
if
(
t
!=
NULL
)
{
pWindowResInfo
->
pResult
=
(
SWindowResult
*
)
t
;
memset
(
&
pWindowResInfo
->
pResult
[
pWindowResInfo
->
capacity
],
0
,
sizeof
(
SWindowResult
)
*
pWindowResInfo
->
capacity
);
}
else
{
// todo
}
if
(
!
masterscan
)
{
// not master scan, do not add new timewindow
return
NULL
;
}
for
(
int32_t
i
=
pWindowResInfo
->
capacity
;
i
<
newCap
;
++
i
)
{
SPosInfo
pos
=
{
-
1
,
-
1
};
createQueryResultInfo
(
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
&
pos
,
pRuntimeEnv
->
interBufSize
);
}
pWindowResInfo
->
capacity
=
newCap
;
// more than the capacity, reallocate the resources
if
(
pWindowResInfo
->
size
>=
pWindowResInfo
->
capacity
)
{
int64_t
newCap
=
pWindowResInfo
->
capacity
*
1
.
5
;
char
*
t
=
realloc
(
pWindowResInfo
->
pResult
,
newCap
*
sizeof
(
SWindowResult
));
if
(
t
!=
NULL
)
{
pWindowResInfo
->
pResult
=
(
SWindowResult
*
)
t
;
int32_t
inc
=
newCap
-
pWindowResInfo
->
capacity
;
memset
(
&
pWindowResInfo
->
pResult
[
pWindowResInfo
->
capacity
],
0
,
sizeof
(
SWindowResult
)
*
inc
);
}
else
{
// todo
}
// add a new result set for a new group
pWindowResInfo
->
curIndex
=
pWindowResInfo
->
size
++
;
taosHashPut
(
pWindowResInfo
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pWindowResInfo
->
curIndex
,
sizeof
(
int32_t
));
}
else
{
return
NULL
;
for
(
int32_t
i
=
pWindowResInfo
->
capacity
;
i
<
newCap
;
++
i
)
{
createQueryResultInfo
(
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
pRuntimeEnv
->
interBufSize
)
;
}
pWindowResInfo
->
capacity
=
newCap
;
}
// add a new result set for a new group
pWindowResInfo
->
curIndex
=
pWindowResInfo
->
size
++
;
taosHashPut
(
pWindowResInfo
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pWindowResInfo
->
curIndex
,
sizeof
(
int32_t
));
}
return
getWindowResult
(
pWindowResInfo
,
pWindowResInfo
->
curIndex
);
...
...
@@ -511,10 +513,11 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
}
*
newWind
=
true
;
// not assign result buffer yet, add new result buffer
if
(
pWindowRes
->
pos
.
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
sid
,
pRuntimeEnv
->
numOfRowsPerPage
);
if
(
ret
!=
0
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
}
...
...
@@ -531,7 +534,7 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int
return
&
pWindowResInfo
->
pResult
[
slot
].
status
;
}
static
int32_t
getForwardStepsInBlock
(
int32_t
numOfRows
,
__block_search_fn_t
searchFn
,
TSKEY
ekey
,
int16_t
pos
,
static
FORCE_INLINE
int32_t
getForwardStepsInBlock
(
int32_t
numOfRows
,
__block_search_fn_t
searchFn
,
TSKEY
ekey
,
int16_t
pos
,
int16_t
order
,
int64_t
*
pData
)
{
int32_t
forwardStep
=
0
;
...
...
@@ -647,12 +650,8 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
if
(
ekey
<
pDataBlockInfo
->
window
.
ekey
)
{
num
=
getForwardStepsInBlock
(
pDataBlockInfo
->
rows
,
searchFn
,
ekey
,
startPos
,
order
,
pPrimaryColumn
);
if
(
num
==
0
)
{
// no qualified data in current block, do not update the lastKey value
assert
(
ekey
<
pPrimaryColumn
[
startPos
]);
}
else
{
if
(
updateLastKey
)
{
item
->
lastKey
=
pPrimaryColumn
[
startPos
+
(
num
-
1
)]
+
step
;
}
if
(
updateLastKey
)
{
// update the last key
item
->
lastKey
=
pPrimaryColumn
[
startPos
+
(
num
-
1
)]
+
step
;
}
}
else
{
num
=
pDataBlockInfo
->
rows
-
startPos
;
...
...
@@ -663,12 +662,8 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
}
else
{
// desc
if
(
ekey
>
pDataBlockInfo
->
window
.
skey
)
{
num
=
getForwardStepsInBlock
(
pDataBlockInfo
->
rows
,
searchFn
,
ekey
,
startPos
,
order
,
pPrimaryColumn
);
if
(
num
==
0
)
{
// no qualified data in current block, do not update the lastKey value
assert
(
ekey
>
pPrimaryColumn
[
startPos
]);
}
else
{
if
(
updateLastKey
)
{
item
->
lastKey
=
pPrimaryColumn
[
startPos
-
(
num
-
1
)]
+
step
;
}
if
(
updateLastKey
)
{
// update the last key
item
->
lastKey
=
pPrimaryColumn
[
startPos
-
(
num
-
1
)]
+
step
;
}
}
else
{
num
=
startPos
+
1
;
...
...
@@ -912,13 +907,20 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
tsCols
!=
NULL
)
{
int32_t
offset
=
GET_COL_DATA_POS
(
pQuery
,
0
,
step
);
TSKEY
ts
=
tsCols
[
offset
];
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
/* && tsCols != NULL*/
)
{
TSKEY
ts
=
TSKEY_INITIAL_VAL
;
bool
hasTimeWindow
=
false
;
if
(
tsCols
==
NULL
)
{
ts
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pDataBlockInfo
->
window
.
skey
:
pDataBlockInfo
->
window
.
ekey
;
}
else
{
int32_t
offset
=
GET_COL_DATA_POS
(
pQuery
,
0
,
step
);
ts
=
tsCols
[
offset
];
}
bool
hasTimeWindow
=
false
;
STimeWindow
win
=
getActiveTimeWindow
(
pWindowResInfo
,
ts
,
pQuery
);
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
win
,
masterScan
,
&
hasTimeWindow
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
win
,
masterScan
,
&
hasTimeWindow
)
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
sasArray
);
return
;
}
...
...
@@ -927,7 +929,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
int32_t
startPos
=
pQuery
->
pos
;
if
(
hasTimeWindow
)
{
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
pQuery
->
pos
,
ekey
,
searchFn
,
true
);
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
...
...
@@ -946,7 +948,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
// null data, failed to allocate more memory buffer
hasTimeWindow
=
false
;
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
nextWin
,
masterScan
,
&
hasTimeWindow
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
nextWin
,
masterScan
,
&
hasTimeWindow
)
!=
TSDB_CODE_SUCCESS
)
{
break
;
}
...
...
@@ -957,7 +960,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
nextWin
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
startPos
,
ekey
,
searchFn
,
true
);
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
nextWin
,
startPos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
}
...
...
@@ -1478,7 +1481,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
qDebug
(
"QInfo:%p setup runtime env"
,
GET_QINFO_ADDR
(
pRuntimeEnv
));
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
pRuntimeEnv
->
resultInfo
=
calloc
(
pQuery
->
numOfOutput
,
sizeof
(
SResultInfo
));
size_t
size
=
pRuntimeEnv
->
interBufSize
+
pQuery
->
numOfOutput
*
sizeof
(
SResultInfo
);
pRuntimeEnv
->
resultInfo
=
calloc
(
1
,
size
);
pRuntimeEnv
->
pCtx
=
(
SQLFunctionCtx
*
)
calloc
(
pQuery
->
numOfOutput
,
sizeof
(
SQLFunctionCtx
));
if
(
pRuntimeEnv
->
resultInfo
==
NULL
||
pRuntimeEnv
->
pCtx
==
NULL
)
{
...
...
@@ -1549,7 +1554,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
}
}
char
*
buf
=
calloc
(
1
,
pRuntimeEnv
->
interBufSize
)
;
char
*
buf
=
(
char
*
)
pRuntimeEnv
->
resultInfo
+
sizeof
(
SResultInfo
)
*
pQuery
->
numOfOutput
;
// set the intermediate result output buffer
setWindowResultInfo
(
pRuntimeEnv
->
resultInfo
,
pQuery
,
pRuntimeEnv
->
stableQuery
,
buf
);
...
...
@@ -1592,7 +1597,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree
(
pCtx
->
tagInfo
.
pTagCtxList
);
}
tfree
(
pRuntimeEnv
->
resultInfo
[
0
].
interResultBuf
);
tfree
(
pRuntimeEnv
->
resultInfo
);
tfree
(
pRuntimeEnv
->
pCtx
);
}
...
...
@@ -1608,7 +1612,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
static
void
setQueryKilled
(
SQInfo
*
pQInfo
)
{
pQInfo
->
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
}
static
void
setQueryKilled
(
SQInfo
*
pQInfo
)
{
pQInfo
->
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;}
static
bool
isFixedOutputQuery
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -1912,23 +1916,21 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
return
num
;
}
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1)
static
FORCE_INLINE
int32_t
getNumOfRowsInResultPage
(
SQuery
*
pQuery
,
bool
topBotQuery
,
bool
isSTableQuery
)
{
int32_t
rowSize
=
pQuery
->
rowSize
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
topBotQuery
,
isSTableQuery
);
return
(
DEFAULT_INTERN_BUF_PAGE_SIZE
-
sizeof
(
tFilePage
))
/
rowSize
;
}
char
*
getPosInResultPage
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
columnIndex
,
SWindowResult
*
pResult
)
{
assert
(
pResult
!=
NULL
&&
pRuntimeEnv
!=
NULL
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
tFilePage
*
page
=
GET_RES_BUF_PAGE_BY_ID
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pos
.
pageId
);
int32_t
realRowId
=
pResult
->
pos
.
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
);
return
((
char
*
)
page
->
data
)
+
pRuntimeEnv
->
offset
[
columnIndex
]
*
pRuntimeEnv
->
numOfRowsPerPage
+
pQuery
->
pSelectExpr
[
columnIndex
].
bytes
*
realRowId
;
}
//
char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult) {
//
assert(pResult != NULL && pRuntimeEnv != NULL);
//
//
SQuery *pQuery = pRuntimeEnv->pQuery;
//
tFilePage *page = GET_RES_BUF_PAGE_BY_ID(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
//
int32_t realRowId = pResult->pos.rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery);
//
//
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
//
pQuery->pSelectExpr[columnIndex].bytes * realRowId;
//
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
...
...
@@ -1997,23 +1999,80 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
return
false
;
}
#define PT_IN_WINDOW(_p, _w) ((_p) > (_w).skey && (_p) < (_w).ekey)
static
bool
overlapWithTimeWindow
(
SQuery
*
pQuery
,
SDataBlockInfo
*
pBlockInfo
)
{
STimeWindow
w
=
{
0
};
TSKEY
sk
=
MIN
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
);
TSKEY
ek
=
MAX
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
);
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
getAlignQueryTimeWindow
(
pQuery
,
pBlockInfo
->
window
.
skey
,
sk
,
ek
,
&
w
);
if
(
PT_IN_WINDOW
(
w
.
ekey
,
pBlockInfo
->
window
))
{
return
true
;
}
while
(
1
)
{
GET_NEXT_TIMEWINDOW
(
pQuery
,
&
w
);
if
(
w
.
skey
>
pBlockInfo
->
window
.
skey
)
{
break
;
}
if
(
PT_IN_WINDOW
(
w
.
skey
,
pBlockInfo
->
window
)
||
PT_IN_WINDOW
(
w
.
ekey
,
pBlockInfo
->
window
))
{
return
true
;
}
}
}
else
{
getAlignQueryTimeWindow
(
pQuery
,
pBlockInfo
->
window
.
ekey
,
sk
,
ek
,
&
w
);
if
(
PT_IN_WINDOW
(
w
.
skey
,
pBlockInfo
->
window
))
{
return
true
;
}
while
(
1
)
{
GET_NEXT_TIMEWINDOW
(
pQuery
,
&
w
);
if
(
w
.
ekey
<
pBlockInfo
->
window
.
skey
)
{
break
;
}
if
(
PT_IN_WINDOW
(
w
.
skey
,
pBlockInfo
->
window
)
||
PT_IN_WINDOW
(
w
.
ekey
,
pBlockInfo
->
window
))
{
return
true
;
}
}
}
return
false
;
}
int32_t
loadDataBlockOnDemand
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
void
*
pQueryHandle
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
**
pStatis
,
SArray
**
pDataBlock
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
uint32_t
status
=
0
;
if
(
pQuery
->
numOfFilterCols
>
0
)
{
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTSBuf
>
0
)
{
status
=
BLK_DATA_ALL_NEEDED
;
}
else
{
// check if this data block is required to load
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pSqlFunc
=
&
pQuery
->
pSelectExpr
[
i
].
base
;
int32_t
functionId
=
pSqlFunc
->
functionId
;
int32_t
colId
=
pSqlFunc
->
colInfo
.
colId
;
status
|=
aAggs
[
functionId
].
dataReqFunc
(
&
pRuntimeEnv
->
pCtx
[
i
],
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
colId
);
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, loading current
// data block is not needed.
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
overlapWithTimeWindow
(
pQuery
,
pBlockInfo
))
{
status
=
BLK_DATA_ALL_NEEDED
;
}
if
(
pRuntimeEnv
->
pTSBuf
>
0
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
status
|=
BLK_DATA_ALL_NEEDED
;
if
(
status
!=
BLK_DATA_ALL_NEEDED
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pSqlFunc
=
&
pQuery
->
pSelectExpr
[
i
].
base
;
int32_t
functionId
=
pSqlFunc
->
functionId
;
int32_t
colId
=
pSqlFunc
->
colInfo
.
colId
;
status
|=
aAggs
[
functionId
].
dataReqFunc
(
&
pRuntimeEnv
->
pCtx
[
i
],
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
colId
);
if
((
status
&
BLK_DATA_ALL_NEEDED
)
!=
0
)
{
break
;
}
}
}
}
...
...
@@ -2954,13 +3013,14 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
SPosInfo
*
posInfo
,
size_t
interBufSize
)
{
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
size_t
interBufSize
)
{
int32_t
numOfCols
=
pQuery
->
numOfOutput
;
pResultRow
->
resultInfo
=
calloc
((
size_t
)
numOfCols
,
sizeof
(
SResultInfo
));
pResultRow
->
pos
=
*
posInfo
;
size_t
size
=
numOfCols
*
sizeof
(
SResultInfo
)
+
interBufSize
;
pResultRow
->
resultInfo
=
calloc
(
1
,
size
);
pResultRow
->
pos
=
(
SPosInfo
)
{
-
1
,
-
1
};
char
*
buf
=
calloc
(
1
,
interBufSize
);
char
*
buf
=
(
char
*
)
pResultRow
->
resultInfo
+
numOfCols
*
sizeof
(
SResultInfo
);
// set the intermediate result output buffer
setWindowResultInfo
(
pResultRow
->
resultInfo
,
pQuery
,
isSTableQuery
,
buf
);
...
...
@@ -4263,7 +4323,6 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SDataStatis
*
pStatis
=
NULL
;
SArray
*
pDataBlock
=
NULL
;
if
(
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
,
&
pDataBlock
)
==
BLK_DATA_DISCARD
)
{
pQuery
->
current
->
lastKey
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
blockInfo
.
window
.
ekey
+
step
:
blockInfo
.
window
.
skey
+
step
;
continue
;
...
...
src/query/src/qUtil.c
浏览文件 @
8114e1e7
...
...
@@ -51,19 +51,17 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
// use the pointer arraylist
pWindowResInfo
->
pResult
=
calloc
(
threshold
,
sizeof
(
SWindowResult
));
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
capacity
;
++
i
)
{
SPosInfo
posInfo
=
{
-
1
,
-
1
};
createQueryResultInfo
(
pRuntimeEnv
->
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
&
posInfo
,
pRuntimeEnv
->
interBufSize
);
createQueryResultInfo
(
pRuntimeEnv
->
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
pRuntimeEnv
->
interBufSize
);
}
return
TSDB_CODE_SUCCESS
;
}
void
destroyTimeWindowRes
(
SWindowResult
*
pWindowRes
,
int32_t
nOutputCols
)
{
void
destroyTimeWindowRes
(
SWindowResult
*
pWindowRes
)
{
if
(
pWindowRes
==
NULL
)
{
return
;
}
free
(
pWindowRes
->
resultInfo
[
0
].
interResultBuf
);
free
(
pWindowRes
->
resultInfo
);
}
...
...
@@ -78,7 +76,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, int32_t numOfCols) {
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
capacity
;
++
i
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
destroyTimeWindowRes
(
pResult
,
numOfCols
);
destroyTimeWindowRes
(
pResult
);
}
taosHashCleanup
(
pWindowResInfo
->
hashList
);
...
...
@@ -217,11 +215,6 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
}
}
SWindowResult
*
getWindowResult
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
assert
(
pWindowResInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pWindowResInfo
->
size
);
return
&
pWindowResInfo
->
pResult
[
slot
];
}
bool
isWindowResClosed
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
return
(
getWindowResult
(
pWindowResInfo
,
slot
)
->
status
.
closed
==
true
);
}
...
...
src/query/src/qresultBuf.c
浏览文件 @
8114e1e7
...
...
@@ -86,7 +86,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
return
TSDB_CODE_SUCCESS
;
}
static
bool
noMoreAvailablePages
(
SDiskbasedResultBuf
*
pResultBuf
)
{
static
FORCE_INLINE
bool
noMoreAvailablePages
(
SDiskbasedResultBuf
*
pResultBuf
)
{
return
(
pResultBuf
->
allocateId
==
pResultBuf
->
numOfPages
-
1
);
}
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
8114e1e7
...
...
@@ -603,6 +603,8 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tdInitDataCols
(
pQueryHandle
->
rhelper
.
pDataCols
[
0
],
pSchema
);
tdInitDataCols
(
pQueryHandle
->
rhelper
.
pDataCols
[
1
],
pSchema
);
// int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
// int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, QH_GET_NUM_OF_COLS(pQueryHandle));
if
(
tsdbLoadBlockData
(
&
(
pQueryHandle
->
rhelper
),
pBlock
,
pCheckInfo
->
pCompInfo
)
==
0
)
{
SDataBlockLoadInfo
*
pBlockLoadInfo
=
&
pQueryHandle
->
dataBlockLoadInfo
;
...
...
@@ -1361,7 +1363,6 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
return
TSDB_CODE_SUCCESS
;
}
// todo opt for only one table case
static
int32_t
getDataBlocksInFilesImpl
(
STsdbQueryHandle
*
pQueryHandle
,
bool
*
exists
)
{
pQueryHandle
->
numOfBlocks
=
0
;
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录