Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
921eaa1c
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
921eaa1c
编写于
2月 26, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-2895] refactor.
上级
f54f210e
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
207 addition
and
149 deletion
+207
-149
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+19
-11
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+2
-0
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+43
-5
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+121
-125
src/query/src/qFill.c
src/query/src/qFill.c
+11
-6
src/query/src/qUtil.c
src/query/src/qUtil.c
+9
-1
src/query/src/queryMain.c
src/query/src/queryMain.c
+1
-1
tests/script/general/parser/limit1_stb.sim
tests/script/general/parser/limit1_stb.sim
+1
-0
未找到文件。
src/query/inc/qExecutor.h
浏览文件 @
921eaa1c
...
...
@@ -274,19 +274,27 @@ typedef struct SQueryRuntimeEnv {
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
struct
SOperatorInfo
*
proot
;
SGroupResInfo
groupResInfo
;
int64_t
currentOffset
;
// dynamic offset value
}
SQueryRuntimeEnv
;
enum
{
OP_IN_EXECUTING
=
1
,
OP_RES_TO_RETURN
=
2
,
OP_EXEC_DONE
=
3
,
};
typedef
struct
SOperatorInfo
{
char
*
nam
e
;
bool
blockingOptr
;
bool
completed
;
void
*
info
;
SExprInfo
*
pExpr
;
int32_t
numOfOutput
;
S
QueryRuntimeEnv
*
pRuntimeEnv
;
struct
SOperatorInfo
*
upstream
;
uint8_t
operatorTyp
e
;
bool
blockingOptr
;
// block operator or not
uint8_t
completed
;
// denote if current operator is completed
uint32_t
seed
;
// operator seed
int32_t
numOfOutput
;
// number of columns of the current operator results
char
*
name
;
// name, used to show the query execution plan
void
*
info
;
// extension attribution
S
ExprInfo
*
pExpr
;
SQueryRuntimeEnv
*
pRuntimeEnv
;
struct
SOperatorInfo
*
upstream
;
__operator_fn_t
exec
;
__optr_cleanup_fn_t
cleanup
;
}
SOperatorInfo
;
...
...
@@ -391,11 +399,11 @@ typedef struct SLimitOperatorInfo {
typedef
struct
SOffsetOperatorInfo
{
int64_t
offset
;
int64_t
currentOffset
;
}
SOffsetOperatorInfo
;
typedef
struct
SFillOperatorInfo
{
SSDataBlock
*
pRes
;
SSDataBlock
*
pRes
;
int64_t
totalInputRows
;
}
SFillOperatorInfo
;
typedef
struct
SHashGroupbyOperatorInfo
{
...
...
src/query/inc/qUtil.h
浏览文件 @
921eaa1c
...
...
@@ -84,7 +84,9 @@ void freeInterResult(void* param);
void
initGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
,
SResultRowInfo
*
pResultInfo
,
int32_t
offset
);
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
);
bool
hasRemainDataInCurrentGroup
(
SGroupResInfo
*
pGroupResInfo
);
bool
hasRemainData
(
SGroupResInfo
*
pGroupResInfo
);
bool
incNextGroup
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
);
...
...
src/query/src/qAggMain.c
浏览文件 @
921eaa1c
...
...
@@ -1439,7 +1439,20 @@ static void stddev_function_f(SQLFunctionCtx *pCtx, int32_t index) {
// the second stage to calculate standard deviation
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SStddevInfo
*
pStd
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pCtx
->
currentStage
==
REPEAT_SCAN
&&
pStd
->
stage
==
0
)
{
pStd
->
stage
++
;
avg_finalizer
(
pCtx
);
pResInfo
->
initialized
=
true
;
// set it initialized to avoid re-initialization
// save average value into tmpBuf, for second stage scan
SAvgInfo
*
pAvg
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pStd
->
avg
=
GET_DOUBLE_VAL
(
pCtx
->
pOutput
);
assert
((
isnan
(
pAvg
->
sum
)
&&
pAvg
->
num
==
0
)
||
(
pStd
->
num
==
pAvg
->
num
&&
pStd
->
avg
==
pAvg
->
sum
));
}
/* the first stage is to calculate average value */
if
(
pStd
->
stage
==
0
)
{
avg_function_f
(
pCtx
,
index
);
...
...
@@ -2695,19 +2708,34 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pCtx
->
currentStage
==
REPEAT_SCAN
&&
pInfo
->
stage
==
0
)
{
// all data are null, set it completed
if
(
pInfo
->
numOfElems
==
0
)
{
pResInfo
->
complete
=
true
;
}
else
{
pInfo
->
pMemBucket
=
tMemBucketCreate
(
pCtx
->
inputBytes
,
pCtx
->
inputType
,
pInfo
->
minval
,
pInfo
->
maxval
);
}
pInfo
->
stage
+=
1
;
}
// the first stage, only acquire the min/max value
if
(
pInfo
->
stage
==
0
)
{
if
(
pCtx
->
preAggVals
.
isSet
)
{
double
tmin
=
0
.
0
,
tmax
=
0
.
0
;
if
(
pCtx
->
inputType
>=
TSDB_DATA_TYPE_TINYINT
&&
pCtx
->
inputType
<=
TSDB_DATA_TYPE_BIGINT
)
{
if
(
IS_SIGNED_NUMERIC_TYPE
(
pCtx
->
inputType
)
)
{
tmin
=
(
double
)
GET_INT64_VAL
(
&
pCtx
->
preAggVals
.
statis
.
min
);
tmax
=
(
double
)
GET_INT64_VAL
(
&
pCtx
->
preAggVals
.
statis
.
max
);
}
else
if
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_DOUBLE
||
pCtx
->
inputType
==
TSDB_DATA_TYPE_FLOAT
)
{
}
else
if
(
IS_FLOAT_TYPE
(
pCtx
->
inputType
)
)
{
tmin
=
GET_DOUBLE_VAL
(
&
pCtx
->
preAggVals
.
statis
.
min
);
tmax
=
GET_DOUBLE_VAL
(
&
pCtx
->
preAggVals
.
statis
.
max
);
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
pCtx
->
inputType
))
{
tmin
=
(
double
)
GET_UINT64_VAL
(
pCtx
->
preAggVals
.
statis
.
min
);
tmax
=
(
double
)
GET_UINT64_VAL
(
pCtx
->
preAggVals
.
statis
.
max
);
}
else
{
assert
(
true
);
}
if
(
GET_DOUBLE_VAL
(
&
pInfo
->
minval
)
>
tmin
)
{
SET_DOUBLE_VAL
(
&
pInfo
->
minval
,
tmin
);
}
...
...
@@ -2764,10 +2792,20 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
pInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
if
(
pInfo
->
stage
==
0
)
{
if
(
pCtx
->
currentStage
==
REPEAT_SCAN
&&
pInfo
->
stage
==
0
)
{
// all data are null, set it completed
if
(
pInfo
->
numOfElems
==
0
)
{
pResInfo
->
complete
=
true
;
}
else
{
pInfo
->
pMemBucket
=
tMemBucketCreate
(
pCtx
->
inputBytes
,
pCtx
->
inputType
,
pInfo
->
minval
,
pInfo
->
maxval
);
}
pInfo
->
stage
+=
1
;
}
if
(
pInfo
->
stage
==
0
)
{
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pCtx
->
inputType
,
pData
);
...
...
src/query/src/qExecutor.c
浏览文件 @
921eaa1c
...
...
@@ -189,31 +189,20 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static
void
destroyArithOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createArithOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
);
static
SOperatorInfo
*
createOffsetOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
);
static
SOperatorInfo
*
createIntervalAggOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createFillOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createHashGroupbyOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createFillOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createHashGroupbyOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createStableAggOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createStableIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
SOperatorInfo
*
createTagScanOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
static
SOperatorInfo
*
createTagScanOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
static
int32_t
doCopyToSData_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
orderType
,
SSDataBlock
*
pBlock
);
static
char
*
getGroupbyColumnData
(
SQuery
*
pQuery
,
int16_t
*
type
,
int16_t
*
bytes
,
SArray
*
pDataBlock
);
static
int32_t
getGroupbyColumnData_rv
(
SSqlGroupbyExpr
*
pGroupbyExpr
,
SSDataBlock
*
pDataBlock
);
//static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static
int32_t
setGroupResultOutputBuf_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
int32_t
groupIndex
,
int32_t
*
offset
);
static
void
destroyOperatorInfo
(
SOperatorInfo
*
pOperator
);
...
...
@@ -221,6 +210,7 @@ void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size);
void
getAlignQueryTimeWindow
(
SQuery
*
pQuery
,
int64_t
key
,
int64_t
keyFirst
,
int64_t
keyLast
,
STimeWindow
*
win
);
// setup the output buffer
// TODO prepare the output buffer dynamically
static
SSDataBlock
*
createOutputBuf
(
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SSDataBlock
*
res
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
res
->
info
.
numOfCols
=
numOfOutput
;
...
...
@@ -2163,7 +2153,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv
->
pool
=
initResultRowPool
(
getResultRowSize
(
pRuntimeEnv
));
pRuntimeEnv
->
prevRow
=
malloc
(
POINTER_BYTES
*
pQuery
->
numOfCols
+
pQuery
->
srcRowSize
);
pRuntimeEnv
->
tagVal
=
malloc
(
pQuery
->
tagLen
);
// pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
pRuntimeEnv
->
currentOffset
=
pQuery
->
limit
.
offset
;
pRuntimeEnv
->
sasArray
=
calloc
(
pQuery
->
numOfOutput
,
sizeof
(
SArithmeticSupport
));
if
(
/*pRuntimeEnv->rowCellInfoOffset == NULL || */
pRuntimeEnv
->
sasArray
==
NULL
||
...
...
@@ -2256,10 +2247,7 @@ static void doFreeQueryHandle(SQInfo* pQInfo) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pQueryHandle
);
// tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
pRuntimeEnv
->
pQueryHandle
=
NULL
;
// pRuntimeEnv->pSecQueryHandle = NULL;
SMemRef
*
pMemRef
=
&
pQuery
->
memRef
;
assert
(
pMemRef
->
ref
==
0
&&
pMemRef
->
imem
==
NULL
&&
pMemRef
->
mem
==
NULL
);
...
...
@@ -3394,9 +3382,10 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
void
copyResToQueryResultBuf_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
threshold
,
SSDataBlock
*
pBlock
,
int32_t
*
offset
)
{
SGroupResInfo
*
pGroupResInfo
=
&
pRuntimeEnv
->
groupResInfo
;
pBlock
->
info
.
rows
=
0
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
while
(
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
)
{
while
(
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
)
{
// all results in current group have been returned to client, try next group
if
((
pGroupResInfo
->
pRows
==
NULL
)
||
taosArrayGetSize
(
pGroupResInfo
->
pRows
)
==
0
)
{
assert
(
pGroupResInfo
->
index
==
0
);
...
...
@@ -3408,18 +3397,19 @@ void copyResToQueryResultBuf_rv(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold
doCopyToSData_rv
(
pRuntimeEnv
,
pGroupResInfo
,
TSDB_ORDER_ASC
,
pBlock
);
// current data are all dumped to result buffer, clear it
if
(
!
hasRemainData
(
pGroupResInfo
))
{
if
(
!
hasRemainData
InCurrentGroup
(
pGroupResInfo
))
{
cleanupGroupResInfo
(
pGroupResInfo
);
if
(
!
incNextGroup
(
pGroupResInfo
))
{
SET_STABLE_QUERY_OVER
(
pRuntimeEnv
);
}
}
// enough results in data buffer, return
if
(
pBlock
->
info
.
rows
>=
threshold
)
{
break
;
// enough results in data buffer, return
if
(
pBlock
->
info
.
rows
>=
threshold
)
{
break
;
}
}
}
}
static
void
updateTableQueryInfoForReverseScan
(
SQuery
*
pQuery
,
STableQueryInfo
*
pTableQueryInfo
)
{
...
...
@@ -3536,9 +3526,8 @@ int32_t initResultRow(SResultRow *pResultRow) {
}
void
setDefaultOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pDataBlock
,
int32_t
*
rowCellInfoOffset
)
{
SSDataBlock
*
pDataBlock
,
int32_t
*
rowCellInfoOffset
,
int64_t
uid
)
{
int32_t
tid
=
0
;
int64_t
uid
=
0
;
SResultRow
*
pRow
=
doPrepareResultRowFromKey
(
pRuntimeEnv
,
pResultRowInfo
,
(
char
*
)
&
tid
,
sizeof
(
tid
),
true
,
uid
);
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
...
...
@@ -3718,8 +3707,6 @@ void prepareRepeatTableScan(SQueryRuntimeEnv* pRuntimeEnv) {
void
initCtxOutputBuf_rv
(
SQLFunctionCtx
*
pCtx
,
int32_t
size
)
{
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
pCtx
[
j
].
currentStage
=
0
;
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
j
]);
if
(
pResInfo
->
initialized
)
{
continue
;
...
...
@@ -4568,8 +4555,9 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG
static
void
toSSDataBlock
(
SGroupResInfo
*
pGroupResInfo
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
SSDataBlock
*
pBlock
)
{
assert
(
pGroupResInfo
->
currentGroup
<=
pGroupResInfo
->
totalGroup
);
pBlock
->
info
.
rows
=
0
;
if
(
!
hasRemainData
(
pGroupResInfo
))
{
if
(
!
hasRemainData
InCurrentGroup
(
pGroupResInfo
))
{
return
;
}
...
...
@@ -4659,7 +4647,7 @@ bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupR
int32_t
numOfTotal
=
(
int32_t
)
getNumOfResultsAfterFillGap
(
pFillInfo
,
pQuery
->
window
.
ekey
,
(
int32_t
)
pQuery
->
rec
.
capacity
);
return
numOfTotal
>
0
;
}
else
{
// there are results waiting for returned to client.
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
)
&&
hasRemainData
(
pGroupResInfo
)
&&
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
)
&&
hasRemainData
InCurrentGroup
(
pGroupResInfo
)
&&
(
pQuery
->
groupbyColumn
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
)))
{
return
true
;
}
...
...
@@ -5212,18 +5200,17 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery
->
tsdb
=
tsdb
;
pQuery
->
vgId
=
vgId
;
pQuery
->
stableQuery
=
isSTableQuery
;
pQuery
->
groupbyColumn
=
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
);
pRuntimeEnv
->
groupResInfo
.
totalGroup
=
isSTableQuery
?
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
)
:
0
;
pRuntimeEnv
->
pQuery
=
pQuery
;
pRuntimeEnv
->
pTsBuf
=
pTsBuf
;
pRuntimeEnv
->
cur
.
vgroupIndex
=
-
1
;
pQuery
->
stableQuery
=
isSTableQuery
;
pQuery
->
groupbyColumn
=
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
);
if
(
onlyQueryTags
(
pQuery
))
{
pRuntimeEnv
->
proot
=
createTagScanOperatorInfo
(
pRuntimeEnv
);
pRuntimeEnv
->
proot
=
createTagScanOperatorInfo
(
pRuntimeEnv
,
pQuery
->
pExpr1
,
pQuery
->
numOfOutput
);
}
else
if
(
needReverseScan
(
pQuery
))
{
pRuntimeEnv
->
pi
=
createBiDirectionTableScanInfo
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQuery
),
1
);
}
else
{
...
...
@@ -5810,7 +5797,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
* If the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place.
*/
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
if (hasRemainData
InCurrentGroup
(&pRuntimeEnv->groupResInfo)) {
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
pQuery->rec.total += pQuery->rec.rows;
...
...
@@ -6189,7 +6176,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"SeqScanTableOp"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pRuntimeEnv
->
pQuery
->
numOfCols
;
pOperator
->
exec
=
doTableScan
;
...
...
@@ -6268,14 +6255,10 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return
pTableScanInfo
->
order
;
}
//static int32_t getTableScanFlag(STableScanInfo* pTableScanInfo) {
// return pTableScanInfo->
//}
// this is a blocking operator
static
SSDataBlock
*
doAggregate
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
)
{
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
...
...
@@ -6307,7 +6290,7 @@ static SSDataBlock* doAggregate(void* param) {
doAggregateImpl
(
pOperator
,
pQuery
->
window
.
skey
,
pAggInfo
->
pCtx
,
pBlock
);
}
pOperator
->
completed
=
true
;
pOperator
->
completed
=
OP_EXEC_DONE
;
setQueryStatus
(
pRuntimeEnv
->
pQuery
,
QUERY_COMPLETED
);
finalizeQueryResult_rv
(
pOperator
,
pAggInfo
->
pCtx
,
&
pAggInfo
->
resultRowInfo
,
pAggInfo
->
rowCellInfoOffset
);
...
...
@@ -6318,18 +6301,18 @@ static SSDataBlock* doAggregate(void* param) {
static
SSDataBlock
*
doSTableAggregate
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
)
{
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SAggOperatorInfo
*
pAggInfo
=
pOperator
->
info
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
if
(
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
)
)
{
if
(
pOperator
->
completed
==
OP_RES_TO_RETURN
)
{
toSSDataBlock
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pAggInfo
->
pRes
);
if
(
pAggInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
true
;
if
(
pAggInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
InCurrentGroup
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
OP_EXEC_DONE
;
}
return
pAggInfo
->
pRes
;
...
...
@@ -6363,13 +6346,15 @@ static SSDataBlock* doSTableAggregate(void* param) {
doAggregateImpl
(
pOperator
,
pQuery
->
window
.
skey
,
pAggInfo
->
pCtx
,
pBlock
);
}
pOperator
->
completed
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pAggInfo
->
resultRowInfo
);
updateWindowResNumOfRes_rv
(
pRuntimeEnv
,
pAggInfo
->
pCtx
,
pOperator
->
numOfOutput
,
&
pAggInfo
->
resultRowInfo
,
pAggInfo
->
rowCellInfoOffset
);
initGroupResInfo
(
&
pRuntimeEnv
->
groupResInfo
,
&
pAggInfo
->
resultRowInfo
,
0
);
toSSDataBlock
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pAggInfo
->
pRes
);
if
(
pAggInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
true
;
if
(
pAggInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
InCurrentGroup
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
OP_EXEC_DONE
;
}
return
pAggInfo
->
pRes
;
...
...
@@ -6415,7 +6400,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
static
SSDataBlock
*
doLimit
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
)
{
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
...
...
@@ -6424,7 +6409,7 @@ static SSDataBlock* doLimit(void* param) {
SSDataBlock
*
pBlock
=
pOperator
->
upstream
->
exec
(
pOperator
->
upstream
);
if
(
pBlock
==
NULL
)
{
setQueryStatus
(
pOperator
->
pRuntimeEnv
->
pQuery
,
QUERY_COMPLETED
);
pOperator
->
completed
=
true
;
pOperator
->
completed
=
OP_EXEC_DONE
;
return
NULL
;
}
...
...
@@ -6434,7 +6419,7 @@ static SSDataBlock* doLimit(void* param) {
pInfo
->
total
=
pInfo
->
limit
;
setQueryStatus
(
pOperator
->
pRuntimeEnv
->
pQuery
,
QUERY_COMPLETED
);
pOperator
->
completed
=
true
;
pOperator
->
completed
=
OP_EXEC_DONE
;
}
else
{
pInfo
->
total
+=
pBlock
->
info
.
rows
;
}
...
...
@@ -6442,34 +6427,39 @@ static SSDataBlock* doLimit(void* param) {
return
pBlock
;
}
// TODO add log
static
SSDataBlock
*
doOffset
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
S
OffsetOperatorInfo
*
pInfo
=
pOperator
->
info
;
S
QueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
while
(
1
)
{
SSDataBlock
*
pBlock
=
pOperator
->
upstream
->
exec
(
pOperator
->
upstream
);
if
(
pBlock
==
NULL
)
{
setQueryStatus
(
pOperator
->
pRuntimeEnv
->
pQuery
,
QUERY_COMPLETED
);
setQueryStatus
(
pRuntimeEnv
->
pQuery
,
QUERY_COMPLETED
);
pOperator
->
completed
=
OP_EXEC_DONE
;
return
NULL
;
}
if
(
p
Info
->
currentOffset
==
0
)
{
if
(
p
RuntimeEnv
->
currentOffset
==
0
)
{
return
pBlock
;
}
else
if
(
p
Info
->
currentOffset
>
pBlock
->
info
.
rows
)
{
p
Info
->
currentOffset
-=
pBlock
->
info
.
rows
;
}
else
if
(
p
RuntimeEnv
->
currentOffset
>
pBlock
->
info
.
rows
)
{
p
RuntimeEnv
->
currentOffset
-=
pBlock
->
info
.
rows
;
}
else
{
int32_t
remain
=
pBlock
->
info
.
rows
-
p
Info
->
currentOffset
;
int32_t
remain
=
pBlock
->
info
.
rows
-
p
RuntimeEnv
->
currentOffset
;
pBlock
->
info
.
rows
=
remain
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
int16_t
bytes
=
pColInfoData
->
info
.
bytes
;
memmove
(
pColInfoData
->
pData
,
pColInfoData
->
pData
+
bytes
*
p
Info
->
currentOffset
,
remain
*
bytes
);
memmove
(
pColInfoData
->
pData
,
pColInfoData
->
pData
+
bytes
*
p
RuntimeEnv
->
currentOffset
,
remain
*
bytes
);
}
p
Info
->
currentOffset
=
0
;
p
RuntimeEnv
->
currentOffset
=
0
;
return
pBlock
;
}
}
...
...
@@ -6477,18 +6467,18 @@ static SSDataBlock* doOffset(void* param) {
static
SSDataBlock
*
doHashIntervalAgg
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
)
{
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SHashIntervalOperatorInfo
*
pIntervalInfo
=
pOperator
->
info
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
if
(
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
)
)
{
if
(
pOperator
->
completed
==
OP_RES_TO_RETURN
)
{
toSSDataBlock
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pIntervalInfo
->
pRes
);
if
(
pIntervalInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
true
;
if
(
pIntervalInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
InCurrentGroup
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
OP_EXEC_DONE
;
}
return
pIntervalInfo
->
pRes
;
...
...
@@ -6516,6 +6506,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
pQuery
->
order
.
order
=
order
;
pQuery
->
window
=
win
;
pOperator
->
completed
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pIntervalInfo
->
resultRowInfo
);
setQueryStatus
(
pRuntimeEnv
->
pQuery
,
QUERY_COMPLETED
);
finalizeQueryResult_rv
(
pOperator
,
pIntervalInfo
->
pCtx
,
&
pIntervalInfo
->
resultRowInfo
,
pIntervalInfo
->
rowCellInfoOffset
);
...
...
@@ -6523,27 +6514,26 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
initGroupResInfo
(
&
pRuntimeEnv
->
groupResInfo
,
&
pIntervalInfo
->
resultRowInfo
,
0
);
toSSDataBlock
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pIntervalInfo
->
pRes
);
if
(
pIntervalInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
true
;
if
(
pIntervalInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
InCurrentGroup
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
OP_EXEC_DONE
;
}
return
pIntervalInfo
->
pRes
;
return
pIntervalInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pIntervalInfo
->
pRes
;
}
static
SSDataBlock
*
doSTableIntervalAgg
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
)
{
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SHashIntervalOperatorInfo
*
pIntervalInfo
=
pOperator
->
info
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
if
(
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
toSSDataBlock
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pIntervalInfo
->
pRes
);
if
(
pOperator
->
completed
==
OP_RES_TO_RETURN
)
{
copyResToQueryResultBuf_rv
(
pRuntimeEnv
,
3000
,
pIntervalInfo
->
pRes
,
pIntervalInfo
->
rowCellInfoOffset
);
if
(
pIntervalInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
true
;
pOperator
->
completed
=
OP_EXEC_DONE
;
}
return
pIntervalInfo
->
pRes
;
...
...
@@ -6571,13 +6561,14 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
hashIntervalAgg
(
pOperator
,
&
pTableQueryInfo
->
resInfo
,
pIntervalInfo
,
pBlock
,
pTableQueryInfo
->
groupIndex
);
}
pOperator
->
completed
=
OP_RES_TO_RETURN
;
pQuery
->
order
.
order
=
order
;
// TODO : restore the order
doCloseAllTimeWindow
(
pRuntimeEnv
);
setQueryStatus
(
pRuntimeEnv
->
pQuery
,
QUERY_COMPLETED
);
copyResToQueryResultBuf_rv
(
pRuntimeEnv
,
3000
,
pIntervalInfo
->
pRes
,
pIntervalInfo
->
rowCellInfoOffset
);
if
(
pIntervalInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
true
;
pOperator
->
completed
=
OP_EXEC_DONE
;
}
return
pIntervalInfo
->
pRes
;
...
...
@@ -6585,18 +6576,20 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
static
SSDataBlock
*
doHashGroupbyAgg
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
)
{
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SHashGroupbyOperatorInfo
*
pInfo
=
pOperator
->
info
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
if
(
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
)
)
{
if
(
pOperator
->
completed
==
OP_RES_TO_RETURN
)
{
toSSDataBlock
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pInfo
->
binfo
.
pRes
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
true
;
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hasRemainDataInCurrentGroup
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
OP_EXEC_DONE
;
}
return
pInfo
->
binfo
.
pRes
;
}
...
...
@@ -6611,6 +6604,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pRuntimeEnv
->
pQuery
->
order
.
order
);
setTagVal_rv
(
pOperator
,
pRuntimeEnv
->
pQuery
->
current
->
pTable
,
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
);
if
(
pInfo
->
colIndex
==
-
1
)
{
pInfo
->
colIndex
=
getGroupbyColumnData_rv
(
pRuntimeEnv
->
pQuery
->
pGroupbyExpr
,
pBlock
);
}
...
...
@@ -6618,6 +6612,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
hashGroupbyAgg
(
pRuntimeEnv
,
pOperator
,
pInfo
,
pBlock
);
}
pOperator
->
completed
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
setQueryStatus
(
pRuntimeEnv
->
pQuery
,
QUERY_COMPLETED
);
...
...
@@ -6630,8 +6625,8 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
initGroupResInfo
(
&
pRuntimeEnv
->
groupResInfo
,
&
pInfo
->
binfo
.
resultRowInfo
,
0
);
toSSDataBlock
(
&
pRuntimeEnv
->
groupResInfo
,
pRuntimeEnv
,
pInfo
->
binfo
.
pRes
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
true
;
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hasRemainData
InCurrentGroup
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
completed
=
OP_EXEC_DONE
;
}
return
pInfo
->
binfo
.
pRes
;
...
...
@@ -6639,7 +6634,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
static
SSDataBlock
*
doFill
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
)
{
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
...
...
@@ -6654,18 +6649,24 @@ static SSDataBlock* doFill(void* param) {
while
(
1
)
{
SSDataBlock
*
pBlock
=
pOperator
->
upstream
->
exec
(
pOperator
->
upstream
);
if
(
pBlock
==
NULL
)
{
if
(
pInfo
->
totalInputRows
==
0
)
{
pOperator
->
completed
=
OP_EXEC_DONE
;
return
NULL
;
}
taosFillSetStartInfo
(
pRuntimeEnv
->
pFillInfo
,
0
,
pRuntimeEnv
->
pQuery
->
window
.
ekey
);
}
else
{
pInfo
->
totalInputRows
+=
pBlock
->
info
.
rows
;
int64_t
ekey
=
Q_STATUS_EQUAL
(
pRuntimeEnv
->
pQuery
->
status
,
QUERY_COMPLETED
)
?
pRuntimeEnv
->
pQuery
->
window
.
ekey
:
pBlock
->
info
.
window
.
ekey
;
taosFillSetStartInfo
(
pRuntimeEnv
->
pFillInfo
,
pBlock
->
info
.
rows
,
ekey
);
taosFillSetInputDataBlock
(
pRuntimeEnv
->
pFillInfo
,
pBlock
);
}
doFillGapsInResults_rv
(
pRuntimeEnv
,
pInfo
->
pRes
);
return
pInfo
->
pRes
;
return
(
pInfo
->
pRes
->
info
.
rows
>
0
)
?
pInfo
->
pRes
:
NULL
;
}
return
pInfo
->
pRes
;
}
// todo set the attribute of query scan count
...
...
@@ -6697,21 +6698,24 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
static
SOperatorInfo
*
createAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SAggOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SAggOperatorInfo
));
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
);
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
);
setDefaultOutputBuf
(
pRuntimeEnv
,
pInfo
->
pCtx
,
&
pInfo
->
resultRowInfo
,
pInfo
->
pRes
,
pInfo
->
rowCellInfoOffset
);
int64_t
seed
=
rand
(
);
setDefaultOutputBuf
(
pRuntimeEnv
,
pInfo
->
pCtx
,
&
pInfo
->
resultRowInfo
,
pInfo
->
pRes
,
pInfo
->
rowCellInfoOffset
,
seed
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableAggregate"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
upstream
=
upstream
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
seed
=
seed
;
// TODO refactor: seed to move to pInfo??
pOperator
->
exec
=
doAggregate
;
pOperator
->
cleanup
=
destroyBasicOperatorInfo
;
...
...
@@ -6758,7 +6762,7 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"STableAggregate"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
upstream
=
upstream
;
pOperator
->
pExpr
=
pExpr
;
...
...
@@ -6774,22 +6778,26 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo
*
createArithOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SArithOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SArithOperatorInfo
));
int64_t
seed
=
rand
();
pInfo
->
bufCapacity
=
4096
;
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
);
pInfo
->
binfo
.
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
setDefaultOutputBuf
(
pRuntimeEnv
,
pInfo
->
binfo
.
pCtx
,
&
pInfo
->
binfo
.
resultRowInfo
,
pInfo
->
binfo
.
pRes
,
pInfo
->
binfo
.
rowCellInfoOffset
);
pBInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
);
pBInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pBInfo
->
rowCellInfoOffset
);
initResultRowInfo
(
&
pBInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
setDefaultOutputBuf
(
pRuntimeEnv
,
pBInfo
->
pCtx
,
&
pBInfo
->
resultRowInfo
,
pBInfo
->
pRes
,
pBInfo
->
rowCellInfoOffset
,
seed
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"ArithmeticOp"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
upstream
=
upstream
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
seed
=
seed
;
pOperator
->
exec
=
doArithmeticOperation
;
pOperator
->
cleanup
=
destroyArithOperatorInfo
;
...
...
@@ -6805,7 +6813,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator
->
name
=
"LimitOp"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
upstream
=
upstream
;
pOperator
->
exec
=
doLimit
;
pOperator
->
info
=
pInfo
;
...
...
@@ -6818,13 +6826,11 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
SOffsetOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SOffsetOperatorInfo
));
pInfo
->
offset
=
pRuntimeEnv
->
pQuery
->
limit
.
offset
;
pInfo
->
currentOffset
=
pInfo
->
offset
;
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"OffsetOp"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
upstream
=
upstream
;
pOperator
->
exec
=
doOffset
;
pOperator
->
info
=
pInfo
;
...
...
@@ -6844,7 +6850,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
pOperator
->
name
=
"HashIntervalAgg"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
upstream
=
upstream
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
...
...
@@ -6867,7 +6873,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"STableIntervalAggOp"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
upstream
=
upstream
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
...
...
@@ -6891,7 +6897,7 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"HashGroupbyAgg"
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
upstream
=
upstream
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
...
...
@@ -6913,7 +6919,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator
->
name
=
"FillOp"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
upstream
=
upstream
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
...
...
@@ -6928,6 +6934,9 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
static
SSDataBlock
*
doTagScan
(
void
*
param
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
completed
==
OP_EXEC_DONE
)
{
return
NULL
;
}
STagScanInfo
*
pTagScanInfo
=
pOperator
->
info
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
...
...
@@ -6935,7 +6944,6 @@ static SSDataBlock* doTagScan(void* param) {
size_t
numOfGroup
=
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
);
assert
(
numOfGroup
==
0
||
numOfGroup
==
1
);
if
(
numOfGroup
==
0
)
{
return
NULL
;
}
...
...
@@ -6945,10 +6953,6 @@ static SSDataBlock* doTagScan(void* param) {
size_t
num
=
taosArrayGetSize
(
pa
);
assert
(
num
==
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
);
if
(
pTagScanInfo
->
pRes
==
NULL
)
{
pTagScanInfo
->
pRes
=
createOutputBuf
(
pOperator
->
pExpr
,
pOperator
->
numOfOutput
);
}
int32_t
count
=
0
;
// int32_t functionId = pOperator->pExpr[0].base.functionId;
/*if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
...
...
@@ -7012,17 +7016,12 @@ static SSDataBlock* doTagScan(void* param) {
qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count);
} else*/
{
// return only the tags|table name etc.
count
=
0
;
SSchema
*
tbnameSchema
=
tGetTbnameColumnSchema
();
int32_t
maxNumOfTables
=
(
int32_t
)
pQuery
->
rec
.
capacity
;
// if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pQuery->rec.capacity) {
// maxNumOfTables = (int32_t)pQuery->limit.limit;
// }
SExprInfo
*
pExprInfo
=
pOperator
->
pExpr
;
while
(
pRuntimeEnv
->
tableIndex
<
num
&&
count
<
maxNumOfTables
)
{
int32_t
i
=
pRuntimeEnv
->
tableIndex
++
;
SExprInfo
*
pExprInfo
=
pOperator
->
pExpr
;
STableQueryInfo
*
item
=
taosArrayGetP
(
pa
,
i
);
char
*
data
=
NULL
,
*
dst
=
NULL
;
...
...
@@ -7034,23 +7033,19 @@ static SSDataBlock* doTagScan(void* param) {
}
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pTagScanInfo
->
pRes
->
pDataBlock
,
j
);
type
=
pExprInfo
[
j
].
type
;
bytes
=
pExprInfo
[
j
].
bytes
;
if
(
pExprInfo
[
j
].
base
.
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
bytes
=
tbnameSchema
->
bytes
;
type
=
tbnameSchema
->
type
;
data
=
tsdbGetTableName
(
item
->
pTable
);
dst
=
pColInfo
->
pData
+
count
*
tbnameSchema
->
bytes
;
}
else
{
type
=
pExprInfo
[
j
].
type
;
bytes
=
pExprInfo
[
j
].
bytes
;
data
=
tsdbGetTableTagVal
(
item
->
pTable
,
pExprInfo
[
j
].
base
.
colInfo
.
colId
,
type
,
bytes
);
dst
=
pColInfo
->
pData
+
count
*
pExprInfo
[
j
].
bytes
;
}
dst
=
pColInfo
->
pData
+
count
*
pExprInfo
[
j
].
bytes
;
doSetTagValueToResultBuf
(
dst
,
data
,
type
,
bytes
);
}
count
+=
1
;
}
...
...
@@ -7058,20 +7053,21 @@ static SSDataBlock* doTagScan(void* param) {
qDebug
(
"QInfo:%p create tag values results completed, rows:%d"
,
pRuntimeEnv
->
qinfo
,
count
);
}
return
pTagScanInfo
->
pRes
;
return
(
pTagScanInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pTagScanInfo
->
pRes
;
}
SOperatorInfo
*
createTagScanOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SOperatorInfo
*
createTagScanOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STagScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STagScanInfo
));
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"SeqTagScanOp"
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
completed
=
false
;
pOperator
->
completed
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
exec
=
doTagScan
;
pOperator
->
pExpr
=
p
RuntimeEnv
->
pQuery
->
pExpr1
;
pOperator
->
numOfOutput
=
pRuntimeEnv
->
pQuery
->
numOfOutput
;
pOperator
->
pExpr
=
p
Expr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
return
pOperator
;
...
...
@@ -8503,7 +8499,7 @@ void buildTagQueryResult(SQInfo* pQInfo) {
}
pRuntimeEnv
->
outputBuf
=
pRuntimeEnv
->
proot
->
exec
(
pRuntimeEnv
->
proot
);
pQuery
->
rec
.
rows
=
pRuntimeEnv
->
outputBuf
->
info
.
rows
;
pQuery
->
rec
.
rows
=
(
pRuntimeEnv
->
outputBuf
!=
NULL
)
?
pRuntimeEnv
->
outputBuf
->
info
.
rows
:
0
;
return
;
SArray
*
pa
=
GET_TABLEGROUP
(
pRuntimeEnv
,
0
);
...
...
src/query/src/qFill.c
浏览文件 @
921eaa1c
...
...
@@ -376,11 +376,12 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
tfree
(
pFillInfo
->
prevValues
);
tfree
(
pFillInfo
->
nextValues
);
tfree
(
pFillInfo
->
pTags
);
// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
// tfree(pFillInfo->pData[i]);
// }
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfTags
;
++
i
)
{
tfree
(
pFillInfo
->
pTags
[
i
].
tagVal
);
}
tfree
(
pFillInfo
->
pTags
);
tfree
(
pFillInfo
->
pData
);
tfree
(
pFillInfo
->
pFillCol
);
...
...
@@ -435,12 +436,16 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage*
SFillColInfo
*
pCol
=
&
pFillInfo
->
pFillCol
[
i
];
const
char
*
data
=
pInput
->
data
+
pCol
->
col
.
offset
*
pInput
->
num
;
memcpy
(
pFillInfo
->
pData
[
i
],
data
,
(
size_t
)(
pInput
->
num
*
pCol
->
col
.
bytes
));
if
(
pFillInfo
->
pData
[
i
]
==
NULL
)
{
pFillInfo
->
pData
[
i
]
=
calloc
(
4096
,
pCol
->
col
.
bytes
);
}
memcpy
(
pFillInfo
->
pData
[
i
],
data
,
pCol
->
col
.
bytes
*
pInput
->
num
);
// pFillInfo->pData[i] = (char*) data;
if
(
TSDB_COL_IS_TAG
(
pCol
->
flag
))
{
// copy the tag value to tag value buffer
SFillTagColInfo
*
pTag
=
&
pFillInfo
->
pTags
[
pCol
->
tagIndex
];
assert
(
pTag
->
col
.
colId
==
pCol
->
col
.
colId
);
memcpy
(
pTag
->
tagVal
,
data
,
pCol
->
col
.
bytes
);
memcpy
(
pTag
->
tagVal
,
data
,
pCol
->
col
.
bytes
);
// TODO not memcpy??
}
}
}
...
...
src/query/src/qUtil.c
浏览文件 @
921eaa1c
...
...
@@ -355,7 +355,7 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo,
assert
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
}
bool
hasRemainData
(
SGroupResInfo
*
pGroupResInfo
)
{
bool
hasRemainData
InCurrentGroup
(
SGroupResInfo
*
pGroupResInfo
)
{
if
(
pGroupResInfo
->
pRows
==
NULL
)
{
return
false
;
}
...
...
@@ -363,6 +363,14 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo) {
return
pGroupResInfo
->
index
<
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
}
bool
hasRemainData
(
SGroupResInfo
*
pGroupResInfo
)
{
if
(
hasRemainDataInCurrentGroup
(
pGroupResInfo
))
{
return
true
;
}
return
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
;
}
bool
incNextGroup
(
SGroupResInfo
*
pGroupResInfo
)
{
return
(
++
pGroupResInfo
->
currentGroup
)
<
pGroupResInfo
->
totalGroup
;
}
...
...
src/query/src/queryMain.c
浏览文件 @
921eaa1c
...
...
@@ -326,7 +326,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
(
*
pRsp
)
->
numOfRows
=
htonl
((
int32_t
)
pQuery
->
rec
.
rows
);
if
(
pQInfo
->
code
==
TSDB_CODE_SUCCESS
)
{
(
*
pRsp
)
->
offset
=
htobe64
(
pQ
uery
->
limit
.
o
ffset
);
(
*
pRsp
)
->
offset
=
htobe64
(
pQ
Info
->
runtimeEnv
.
currentO
ffset
);
(
*
pRsp
)
->
useconds
=
htobe64
(
pQInfo
->
summary
.
elapsedTime
);
}
else
{
(
*
pRsp
)
->
offset
=
0
;
...
...
tests/script/general/parser/limit1_stb.sim
浏览文件 @
921eaa1c
...
...
@@ -538,6 +538,7 @@ $offset = $offset + 1
sql select max(c1), min(c2), avg(c3), count(c4), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 interval(5m) limit $offset offset $offset
$val = $rowNum - $offset
if $rows != $val then
print expect $val, actual:$rows
return -1
endi
if $data00 != @18-10-22 02:30:00.000@ then
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录