Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c13c0f57
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c13c0f57
编写于
8月 02, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
merge from develop
上级
bdc83f02
9b3ca08c
变更
6
展开全部
隐藏空白更改
内联
并排
Showing
6 changed file
with
1584 addition
and
23 deletion
+1584
-23
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+2
-0
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+5
-0
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+42
-10
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+211
-11
tests/script/general/parser/interp.sim
tests/script/general/parser/interp.sim
+4
-1
tests/script/general/parser/interp_test.sim
tests/script/general/parser/interp_test.sim
+1320
-1
未找到文件。
src/kit/taosdemo/taosdemo.c
浏览文件 @
c13c0f57
...
...
@@ -75,6 +75,7 @@ extern char configDir[];
#define BUFFER_SIZE TSDB_MAX_ALLOWED_SQL_LEN
#define COND_BUF_LEN (BUFFER_SIZE - 30)
#define COL_BUFFER_LEN ((TSDB_COL_NAME_LEN + 15) * TSDB_MAX_COLUMNS)
#define MAX_USERNAME_SIZE 64
#define MAX_PASSWORD_SIZE 64
#define MAX_HOSTNAME_SIZE 253 // https://man7.org/linux/man-pages/man7/hostname.7.html
...
...
@@ -1413,6 +1414,7 @@ static char *rand_float_str()
return
g_randfloat_buff
+
(
cursor
*
FLOAT_BUFF_LEN
);
}
static
float
rand_float
()
{
static
int
cursor
;
...
...
src/query/inc/qExecutor.h
浏览文件 @
c13c0f57
...
...
@@ -206,6 +206,7 @@ typedef struct SQueryAttr {
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// TODO used bitwise flag
bool
interpQuery
;
// denote if this is an interp query
bool
groupbyColumn
;
// denote if this is a groupby normal column query
bool
hasTagResults
;
// if there are tag values in final result or not
bool
timeWindowInterpo
;
// if the time window start/end required interpolation
...
...
@@ -331,6 +332,8 @@ enum OPERATOR_TYPE_E {
OP_Distinct
=
20
,
OP_Join
=
21
,
OP_StateWindow
=
22
,
OP_AllTimeWindow
=
23
,
OP_AllMultiTableTimeInterval
=
24
,
};
typedef
struct
SOperatorInfo
{
...
...
@@ -549,11 +552,13 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo
*
createProjectOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
);
SOperatorInfo
*
createTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSWindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createFillOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createGroupbyOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTagScanOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createDistinctOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
);
...
...
src/query/src/qAggMain.c
浏览文件 @
c13c0f57
...
...
@@ -3708,27 +3708,59 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
}
}
else
{
// no data generated yet
if
(
pCtx
->
size
==
1
)
{
if
(
pCtx
->
size
<
1
)
{
return
;
}
// check the timestamp in input buffer
TSKEY
skey
=
GET_TS_DATA
(
pCtx
,
0
);
TSKEY
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
// no data generated yet
if
(
!
(
skey
<
pCtx
->
startTs
&&
ekey
>
pCtx
->
startTs
))
{
return
;
}
assert
(
pCtx
->
start
.
key
==
INT64_MIN
&&
skey
<
pCtx
->
startTs
&&
ekey
>
pCtx
->
startTs
);
if
(
type
==
TSDB_FILL_PREV
)
{
if
(
skey
>
pCtx
->
startTs
)
{
return
;
}
if
(
pCtx
->
size
>
1
)
{
TSKEY
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
if
(
ekey
>
skey
&&
ekey
<=
pCtx
->
startTs
)
{
skey
=
ekey
;
}
}
assignVal
(
pCtx
->
pOutput
,
pCtx
->
pInput
,
pCtx
->
outputBytes
,
pCtx
->
inputType
);
}
else
if
(
type
==
TSDB_FILL_NEXT
)
{
char
*
val
=
((
char
*
)
pCtx
->
pInput
)
+
pCtx
->
inputBytes
;
TSKEY
ekey
=
skey
;
char
*
val
=
NULL
;
if
(
ekey
<
pCtx
->
startTs
)
{
if
(
pCtx
->
size
>
1
)
{
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
if
(
ekey
<
pCtx
->
startTs
)
{
return
;
}
val
=
((
char
*
)
pCtx
->
pInput
)
+
pCtx
->
inputBytes
;
}
else
{
return
;
}
}
else
{
val
=
(
char
*
)
pCtx
->
pInput
;
}
assignVal
(
pCtx
->
pOutput
,
val
,
pCtx
->
outputBytes
,
pCtx
->
inputType
);
}
else
if
(
type
==
TSDB_FILL_LINEAR
)
{
if
(
pCtx
->
size
<=
1
)
{
return
;
}
TSKEY
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
// no data generated yet
if
(
!
(
skey
<
pCtx
->
startTs
&&
ekey
>
pCtx
->
startTs
))
{
return
;
}
assert
(
pCtx
->
start
.
key
==
INT64_MIN
&&
skey
<
pCtx
->
startTs
&&
ekey
>
pCtx
->
startTs
);
char
*
start
=
GET_INPUT_DATA
(
pCtx
,
0
);
char
*
end
=
GET_INPUT_DATA
(
pCtx
,
1
);
...
...
src/query/src/qExecutor.c
浏览文件 @
c13c0f57
...
...
@@ -448,8 +448,39 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
pResultRowInfo
->
capacity
=
(
int32_t
)
newCapacity
;
}
static
SResultRow
*
doSetResultOutBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
tid
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
tableGroupId
)
{
static
bool
chkResultRowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
uid
)
{
bool
existed
=
false
;
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
pData
,
bytes
,
uid
);
SResultRow
**
p1
=
(
SResultRow
**
)
taosHashGet
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
// in case of repeat scan/reverse scan, no new time window added.
if
(
QUERY_IS_INTERVAL_QUERY
(
pRuntimeEnv
->
pQuery
))
{
if
(
!
masterscan
)
{
// the *p1 may be NULL in case of sliding+offset exists.
return
p1
!=
NULL
;
}
if
(
p1
!=
NULL
)
{
for
(
int32_t
i
=
pResultRowInfo
->
size
-
1
;
i
>=
0
;
--
i
)
{
if
(
pResultRowInfo
->
pResult
[
i
]
==
(
*
p1
))
{
pResultRowInfo
->
curIndex
=
i
;
existed
=
true
;
break
;
}
}
}
return
existed
;
}
return
p1
!=
NULL
;
}
static
SResultRow
*
doPrepareResultRowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
uid
)
{
bool
existed
=
false
;
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
pData
,
bytes
,
tableGroupId
);
...
...
@@ -592,6 +623,42 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
return
w
;
}
// get the correct time window according to the handled timestamp
static
STimeWindow
getCurrentActiveTimeWindow
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SQuery
*
pQuery
)
{
STimeWindow
w
=
{
0
};
if
(
pResultRowInfo
->
curIndex
==
-
1
)
{
// the first window, from the previous stored value
if
(
pResultRowInfo
->
prevSKey
==
TSKEY_INITIAL_VAL
)
{
getInitialStartTimeWindow
(
pQuery
,
ts
,
&
w
);
pResultRowInfo
->
prevSKey
=
w
.
skey
;
}
else
{
w
.
skey
=
pResultRowInfo
->
prevSKey
;
}
if
(
pQuery
->
interval
.
intervalUnit
==
'n'
||
pQuery
->
interval
.
intervalUnit
==
'y'
)
{
w
.
ekey
=
taosTimeAdd
(
w
.
skey
,
pQuery
->
interval
.
interval
,
pQuery
->
interval
.
intervalUnit
,
pQuery
->
precision
)
-
1
;
}
else
{
w
.
ekey
=
w
.
skey
+
pQuery
->
interval
.
interval
-
1
;
}
}
else
{
int32_t
slot
=
curTimeWindowIndex
(
pResultRowInfo
);
SResultRow
*
pWindowRes
=
getResultRow
(
pResultRowInfo
,
slot
);
w
=
pWindowRes
->
win
;
}
/*
* query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly.
*/
if
(
w
.
ekey
>
pQuery
->
window
.
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
{
w
.
ekey
=
pQuery
->
window
.
ekey
;
}
return
w
;
}
// a new buffer page for each table. Needs to opt this design
static
int32_t
addNewWindowResultBuf
(
SResultRow
*
pWindowRes
,
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
tid
,
uint32_t
size
)
{
if
(
pWindowRes
->
pageId
!=
-
1
)
{
...
...
@@ -637,8 +704,16 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return
0
;
}
static
int32_t
setResultOutputBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
tid
,
STimeWindow
*
win
,
bool
masterscan
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SQLFunctionCtx
*
pCtx
,
static
bool
chkWindowOutputBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
STimeWindow
*
win
,
bool
masterscan
,
SResultRow
**
pResult
,
int64_t
groupId
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
)
{
assert
(
win
->
skey
<=
win
->
ekey
);
return
chkResultRowFromKey
(
pRuntimeEnv
,
pResultRowInfo
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
masterscan
,
groupId
);
}
static
int32_t
setWindowOutputBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
STimeWindow
*
win
,
bool
masterscan
,
SResultRow
**
pResult
,
int64_t
groupId
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
)
{
assert
(
win
->
skey
<=
win
->
ekey
);
SDiskbasedResultBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
...
...
@@ -707,7 +782,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
}
}
assert
(
forwardStep
>
0
);
assert
(
forwardStep
>
=
0
);
return
forwardStep
;
}
...
...
@@ -764,6 +839,8 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
pResultRowInfo
->
curPos
=
i
+
1
;
// current not closed result object
}
}
pResultRowInfo
->
prevSKey
=
pResultRowInfo
->
pResult
[
pResultRowInfo
->
curIndex
]
->
win
.
skey
;
}
static
void
updateResultRowInfoActiveIndex
(
SResultRowInfo
*
pResultRowInfo
,
SQueryAttr
*
pQueryAttr
,
TSKEY
lastKey
)
{
...
...
@@ -813,7 +890,7 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
}
}
assert
(
num
>
0
);
assert
(
num
>
=
0
);
return
num
;
}
...
...
@@ -973,6 +1050,11 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext
}
}
/* interp query with fill should not skip time window */
if
(
pQuery
->
interpQuery
&&
pQuery
->
fillType
!=
TSDB_FILL_NONE
)
{
return
startPos
;
}
/*
* This time window does not cover any data, try next time window,
* this case may happen when the time window is too small
...
...
@@ -1485,6 +1567,81 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
updateResultRowInfoActiveIndex
(
pResultRowInfo
,
pQueryAttr
,
pRuntimeEnv
->
current
->
lastKey
);
}
static
void
hashAllIntervalAgg
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pSDataBlock
,
int32_t
groupId
)
{
STableIntervalOperatorInfo
*
pInfo
=
(
STableIntervalOperatorInfo
*
)
pOperatorInfo
->
info
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperatorInfo
->
pRuntimeEnv
;
int32_t
numOfOutput
=
pOperatorInfo
->
numOfOutput
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pQuery
);
TSKEY
*
tsCols
=
NULL
;
if
(
pSDataBlock
->
pDataBlock
!=
NULL
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
0
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
assert
(
tsCols
[
0
]
==
pSDataBlock
->
info
.
window
.
skey
&&
tsCols
[
pSDataBlock
->
info
.
rows
-
1
]
==
pSDataBlock
->
info
.
window
.
ekey
);
}
int32_t
startPos
=
ascQuery
?
0
:
(
pSDataBlock
->
info
.
rows
-
1
);
TSKEY
ts
=
getStartTsKey
(
pQuery
,
&
pSDataBlock
->
info
.
window
,
tsCols
,
pSDataBlock
->
info
.
rows
);
STimeWindow
win
=
getCurrentActiveTimeWindow
(
pResultRowInfo
,
ts
,
pQuery
);
bool
masterScan
=
IS_MASTER_SCAN
(
pRuntimeEnv
);
SResultRow
*
pResult
=
NULL
;
int32_t
forwardStep
=
0
;
while
(
1
)
{
// null data, failed to allocate more memory buffer
int32_t
code
=
setWindowOutputBufByKey
(
pRuntimeEnv
,
pResultRowInfo
,
&
win
,
masterScan
,
&
pResult
,
groupId
,
pInfo
->
pCtx
,
numOfOutput
,
pInfo
->
rowCellInfoOffset
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
true
);
// window start(end) key interpolation
doWindowBorderInterpolation
(
pOperatorInfo
,
pSDataBlock
,
pInfo
->
pCtx
,
pResult
,
&
win
,
startPos
,
forwardStep
);
doApplyFunctions
(
pRuntimeEnv
,
pInfo
->
pCtx
,
&
win
,
startPos
,
forwardStep
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
);
int32_t
prevEndPos
=
(
forwardStep
-
1
)
*
step
+
startPos
;
startPos
=
getNextQualifiedWindow
(
pQuery
,
&
win
,
&
pSDataBlock
->
info
,
tsCols
,
binarySearchForKey
,
prevEndPos
);
if
(
startPos
<
0
)
{
if
(
win
.
skey
<=
pQuery
->
window
.
ekey
)
{
int32_t
code
=
setWindowOutputBufByKey
(
pRuntimeEnv
,
pResultRowInfo
,
&
win
,
masterScan
,
&
pResult
,
groupId
,
pInfo
->
pCtx
,
numOfOutput
,
pInfo
->
rowCellInfoOffset
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
startPos
=
pSDataBlock
->
info
.
rows
-
1
;
// window start(end) key interpolation
doWindowBorderInterpolation
(
pOperatorInfo
,
pSDataBlock
,
pInfo
->
pCtx
,
pResult
,
&
win
,
startPos
,
forwardStep
);
doApplyFunctions
(
pRuntimeEnv
,
pInfo
->
pCtx
,
&
win
,
startPos
,
forwardStep
,
tsCols
,
pSDataBlock
->
info
.
rows
,
numOfOutput
);
}
break
;
}
setResultRowInterpo
(
pResult
,
RESULT_ROW_END_INTERP
);
}
if
(
pQuery
->
timeWindowInterpo
)
{
int32_t
rowIndex
=
ascQuery
?
(
pSDataBlock
->
info
.
rows
-
1
)
:
0
;
saveDataBlockLastRow
(
pRuntimeEnv
,
&
pSDataBlock
->
info
,
pSDataBlock
->
pDataBlock
,
rowIndex
);
}
updateResultRowInfoActiveIndex
(
pResultRowInfo
,
pQuery
,
pQuery
->
current
->
lastKey
);
}
static
void
doHashGroupbyAgg
(
SOperatorInfo
*
pOperator
,
SGroupbyOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
STableQueryInfo
*
item
=
pRuntimeEnv
->
current
;
...
...
@@ -2918,6 +3075,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// check if this data block is required to load
if
((
*
status
)
!=
BLK_DATA_ALL_NEEDED
)
{
bool
needFilter
=
true
;
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if
(
QUERY_IS_INTERVAL_QUERY
(
pQueryAttr
))
{
...
...
@@ -2931,6 +3090,18 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
,
pTableScanInfo
->
rowCellInfoOffset
)
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
STimeWindow
win
=
getActiveTimeWindow
(
pTableScanInfo
->
pResultRowInfo
,
k
,
pQuery
);
if
(
pQuery
->
interpQuery
)
{
needFilter
=
chkWindowOutputBufByKey
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
&
win
,
masterScan
,
&
pResult
,
groupId
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
,
pTableScanInfo
->
rowCellInfoOffset
);
}
else
{
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
&
win
,
masterScan
,
&
pResult
,
groupId
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
,
pTableScanInfo
->
rowCellInfoOffset
)
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
else
if
(
pQueryAttr
->
stableQuery
&&
(
!
pQueryAttr
->
tsCompQuery
)
&&
(
!
pQueryAttr
->
diffQuery
))
{
// stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
pTableScanInfo
->
pCtx
,
...
...
@@ -2938,7 +3109,11 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pRuntimeEnv
->
current
->
groupIndex
);
}
(
*
status
)
=
doFilterByBlockTimeWindow
(
pTableScanInfo
,
pBlock
);
if
(
needFilter
)
{
(
*
status
)
=
doFilterByBlockTimeWindow
(
pTableScanInfo
,
pBlock
);
}
else
{
(
*
status
)
=
BLK_DATA_ALL_NEEDED
;
}
}
SDataBlockInfo
*
pBlockInfo
=
&
pBlock
->
info
;
...
...
@@ -4546,6 +4721,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
SQueryAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
pQueryAttr
->
tsdb
=
tsdb
;
if
(
tsdb
!=
NULL
)
{
int32_t
code
=
setupQueryHandle
(
tsdb
,
pRuntimeEnv
,
pQInfo
->
qId
,
pQueryAttr
->
stableQuery
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4954,7 +5130,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo
->
pCtx
=
pAggInfo
->
binfo
.
pCtx
;
pTableScanInfo
->
pResultRowInfo
=
&
pAggInfo
->
binfo
.
resultRowInfo
;
pTableScanInfo
->
rowCellInfoOffset
=
pAggInfo
->
binfo
.
rowCellInfoOffset
;
}
else
if
(
pDownstream
->
operatorType
==
OP_TimeWindow
)
{
}
else
if
(
pDownstream
->
operatorType
==
OP_TimeWindow
||
pDownstream
->
operatorType
==
OP_AllTimeWindow
)
{
STableIntervalOperatorInfo
*
pIntervalInfo
=
pDownstream
->
info
;
pTableScanInfo
->
pCtx
=
pIntervalInfo
->
pCtx
;
...
...
@@ -4968,7 +5144,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo
->
pResultRowInfo
=
&
pGroupbyInfo
->
binfo
.
resultRowInfo
;
pTableScanInfo
->
rowCellInfoOffset
=
pGroupbyInfo
->
binfo
.
rowCellInfoOffset
;
}
else
if
(
pDownstream
->
operatorType
==
OP_MultiTableTimeInterval
)
{
}
else
if
(
pDownstream
->
operatorType
==
OP_MultiTableTimeInterval
||
pDownstream
->
operatorType
==
OP_AllMultiTableTimeInterval
)
{
STableIntervalOperatorInfo
*
pInfo
=
pDownstream
->
info
;
pTableScanInfo
->
pCtx
=
pInfo
->
pCtx
;
...
...
@@ -5642,7 +5818,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return
pIntervalInfo
->
pRes
;
}
static
void
doStateWindowAggImpl
(
SOperatorInfo
*
pOperator
,
SStateWindowOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
STableQueryInfo
*
item
=
pRuntimeEnv
->
current
;
...
...
@@ -6285,8 +6460,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
}
SOperatorInfo
*
createSWindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SSWindowOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SSWindowOperatorInfo
));
...
...
@@ -6337,6 +6512,31 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
return
pOperator
;
}
SOperatorInfo
*
createAllMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"AllMultiTableTimeIntervalOperator"
;
pOperator
->
operatorType
=
OP_AllMultiTableTimeInterval
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
upstream
=
upstream
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doAllSTableIntervalAgg
;
pOperator
->
cleanup
=
destroyBasicOperatorInfo
;
return
pOperator
;
}
SOperatorInfo
*
createGroupbyOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SGroupbyOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
pInfo
->
colIndex
=
-
1
;
// group by column index
...
...
tests/script/general/parser/interp.sim
浏览文件 @
c13c0f57
...
...
@@ -55,6 +55,9 @@ while $i < $halfNum
endw
print ====== tables created
sql create table ap1 (ts timestamp, pav float);
sql INSERT INTO ap1 VALUES ('2021-07-25 02:19:54.100',1) ('2021-07-25 02:19:54.200',2) ('2021-07-25 02:19:54.300',3) ('2021-07-25 02:19:56.500',4) ('2021-07-25 02:19:57.500',5) ('2021-07-25 02:19:57.600',6) ('2021-07-25 02:19:57.900',7) ('2021-07-25 02:19:58.100',8) ('2021-07-25 02:19:58.300',9) ('2021-07-25 02:19:59.100',10) ('2021-07-25 02:19:59.300',11) ('2021-07-25 02:19:59.500',12) ('2021-07-25 02:19:59.700',13) ('2021-07-25 02:19:59.900',14) ('2021-07-25 02:20:05.000', 20) ('2021-07-25 02:25:00.000', 10000);
run general/parser/interp_test.sim
print ================== restart server to commit data into disk
...
...
@@ -65,4 +68,4 @@ print ================== server restart completed
run general/parser/interp_test.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/general/parser/interp_test.sim
浏览文件 @
c13c0f57
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录