Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b4514c86
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b4514c86
编写于
4月 19, 2020
作者:
S
slguan
提交者:
GitHub
4月 19, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1664 from taosdata/feature/query
Feature/query
上级
276bb45c
3a3ccf40
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
205 addition
and
360 deletion
+205
-360
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+2
-2
src/inc/tsdb.h
src/inc/tsdb.h
+3
-17
src/query/inc/qast.h
src/query/inc/qast.h
+0
-2
src/query/inc/queryExecutor.h
src/query/inc/queryExecutor.h
+2
-2
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+178
-310
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+20
-27
未找到文件。
src/client/src/tscSQLParser.c
浏览文件 @
b4514c86
...
...
@@ -4674,8 +4674,8 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
}
// No tables included. No results generated. Query results are empty.
if
(
pTableMetaInfo
->
pTableMeta
==
NULL
)
{
tscTrace
(
"%p no table in
metricmeta
, no output result"
,
pSql
);
if
(
pTableMetaInfo
->
vgroupList
->
numOfVgroups
==
0
)
{
tscTrace
(
"%p no table in
super table
, no output result"
,
pSql
);
pQueryInfo
->
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
}
...
...
src/inc/tsdb.h
浏览文件 @
b4514c86
...
...
@@ -145,6 +145,7 @@ typedef struct STableGroupList { // qualified table object list in group
typedef
struct
STsdbQueryCond
{
STimeWindow
twindow
;
int32_t
order
;
// desc/asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfoData
*
colList
;
}
STsdbQueryCond
;
...
...
@@ -189,8 +190,7 @@ typedef void *TsdbPosT;
* @param pTableList table sid list
* @return
*/
TsdbQueryHandleT
*
tsdbQueryTables
(
TsdbRepoT
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
,
SArray
*
pColumnInfo
);
TsdbQueryHandleT
*
tsdbQueryTables
(
TsdbRepoT
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
);
/**
* move to next block
...
...
@@ -241,20 +241,6 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pIdList);
*/
int32_t
tsdbResetQuery
(
TsdbQueryHandleT
*
pQueryHandle
,
STimeWindow
*
window
,
TsdbPosT
position
,
int16_t
order
);
/**
* return the access position of current query handle
* @param pQueryHandle
* @return
*/
int32_t
tsdbDataBlockSeek
(
TsdbQueryHandleT
*
pQueryHandle
,
TsdbPosT
pos
);
/**
* todo remove this function later
* @param pQueryHandle
* @return
*/
TsdbPosT
tsdbDataBlockTell
(
TsdbQueryHandleT
*
pQueryHandle
);
/**
* todo remove this function later
* @param pQueryHandle
...
...
@@ -292,7 +278,7 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle);
* @param pTagCond. tag query condition
*
*/
int32_t
tsdbQuery
Tags
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
STableGroupInfo
*
pGroupList
,
int32_t
tsdbQuery
ByTagsCond
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
STableGroupInfo
*
pGroupList
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
);
int32_t
tsdbGetOneTableGroup
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
STableGroupInfo
*
pGroupInfo
);
...
...
src/query/inc/qast.h
浏览文件 @
b4514c86
...
...
@@ -30,8 +30,6 @@ extern "C" {
struct
tExprNode
;
struct
SSchema
;
struct
tSkipList
;
struct
tSkipListNode
;
enum
{
TSQL_NODE_EXPR
=
0x1
,
...
...
src/query/inc/queryExecutor.h
浏览文件 @
b4514c86
...
...
@@ -170,11 +170,11 @@ typedef struct SQInfo {
int32_t
pointsInterpo
;
int32_t
code
;
// error code to returned to client
sem_t
dataReady
;
STableGroupInfo
groupInfo
;
// table id list
void
*
tsdb
;
STableGroupInfo
groupInfo
;
// table id list
SQueryRuntimeEnv
runtimeEnv
;
int32_t
subgroupId
x
;
int32_t
groupInde
x
;
int32_t
offset
;
/* offset in group result set of subgroup */
T_REF_DECLARE
()
...
...
src/query/src/queryExecutor.c
浏览文件 @
b4514c86
...
...
@@ -48,6 +48,7 @@
#define GET_QINFO_ADDR(x) ((void*)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC))
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
...
...
@@ -82,15 +83,24 @@ typedef enum {
QUERY_OVER
=
0x8u
,
}
vnodeQueryStatus
;
static
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
);
bool
isIntervalQuery
(
SQuery
*
pQuery
)
{
return
pQuery
->
intervalTime
>
0
;
}
enum
{
TS_JOIN_TS_EQUAL
=
0
,
TS_JOIN_TS_NOT_EQUALS
=
1
,
TS_JOIN_TAG_NOT_EQUALS
=
2
,
};
typedef
struct
{
int32_t
status
;
// query status
TSKEY
lastKey
;
// the lastKey value before query executed
STimeWindow
w
;
// whole query time window
STimeWindow
current
;
// current query window
int32_t
windowIndex
;
// index of active time window result for interval query
STSCursor
cur
;
}
SQueryStatusInfo
;
static
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
);
bool
isIntervalQuery
(
SQuery
*
pQuery
)
{
return
pQuery
->
intervalTime
>
0
;
}
static
int32_t
mergeIntoGroupResultImpl
(
SQInfo
*
pQInfo
,
SArray
*
group
);
static
void
setWindowResOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pResult
);
...
...
@@ -2224,111 +2234,11 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery
->
pSelectExpr
[
columnIndex
].
resBytes
*
realRowId
;
}
int32_t
UNUSED_FUNC
vnodeSTableQueryPrepare
(
SQInfo
*
pQInfo
,
SQuery
*
pQuery
,
void
*
param
)
{
if
((
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
))
||
(
!
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
ekey
>
pQuery
->
window
.
skey
)))
{
qTrace
(
"QInfo:%p no result in time range %"
PRId64
"-%"
PRId64
", order %d"
,
pQInfo
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
);
sem_post
(
&
pQInfo
->
dataReady
);
return
TSDB_CODE_SUCCESS
;
}
pQuery
->
status
=
0
;
pQuery
->
rec
=
(
SResultRec
){
0
};
changeExecuteScanOrder
(
pQuery
,
true
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
/*
* since we employ the output control mechanism in main loop.
* so, disable it during data block scan procedure.
*/
setScanLimitationByResultBuffer
(
pQuery
);
// save raw query range for applying to each subgroup
pQuery
->
lastKey
=
pQuery
->
window
.
skey
;
// create runtime environment
// SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
// get one queried meter
assert
(
0
);
// SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid);
pRuntimeEnv
->
pTSBuf
=
param
;
pRuntimeEnv
->
cur
.
vnodeIndex
=
-
1
;
// set the ts-comp file traverse order
if
(
param
!=
NULL
)
{
int16_t
order
=
(
pQuery
->
order
.
order
==
pRuntimeEnv
->
pTSBuf
->
tsOrder
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
tsBufSetTraverseOrder
(
pRuntimeEnv
->
pTSBuf
,
order
);
}
assert
(
0
);
// int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// createTableGroup(pQInfo->pSidSet);
int32_t
size
=
getInitialPageNum
(
pQInfo
);
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pRuntimeEnv
->
pResultBuf
,
size
,
pQuery
->
rowSize
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
if
(
pQuery
->
intervalTime
==
0
)
{
int16_t
type
=
TSDB_DATA_TYPE_NULL
;
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
// group by columns not tags;
type
=
getGroupbyColumnType
(
pQuery
,
pQuery
->
pGroupbyExpr
);
}
else
{
type
=
TSDB_DATA_TYPE_INT
;
// group id
}
initWindowResInfo
(
&
pRuntimeEnv
->
windowResInfo
,
pRuntimeEnv
,
512
,
4096
,
type
);
}
pRuntimeEnv
->
numOfRowsPerPage
=
getNumOfRowsInResultPage
(
pQuery
,
true
);
STsdbQueryCond
cond
=
{
.
twindow
=
(
STimeWindow
)
{.
skey
=
pQuery
->
window
.
skey
,
.
ekey
=
pQuery
->
window
.
ekey
},
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
};
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
// SMeterObj *p1 = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
// taosArrayPush(sa, &p1);
// }
SArray
*
cols
=
taosArrayInit
(
pQuery
->
numOfCols
,
sizeof
(
pQuery
->
colList
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
}
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
NULL
,
&
cond
,
&
pQInfo
->
groupInfo
,
cols
);
// metric query do not invoke interpolation, it will be done at the second-stage merge
if
(
!
isPointInterpoQuery
(
pQuery
))
{
pQuery
->
interpoType
=
TSDB_INTERPO_NONE
;
}
TSKEY
revisedStime
=
taosGetIntervalStartTimestamp
(
pQuery
->
window
.
skey
,
pQuery
->
intervalTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
taosInitInterpoInfo
(
&
pRuntimeEnv
->
interpoInfo
,
pQuery
->
order
.
order
,
revisedStime
,
0
,
0
);
pRuntimeEnv
->
stableQuery
=
true
;
return
TSDB_CODE_SUCCESS
;
}
/**
* decrease the refcount for each table involved in this query
* @param pQInfo
*/
void
vnodeDecMeterRefcnt
(
SQInfo
*
pQInfo
)
{
UNUSED_FUNC
void
vnodeDecMeterRefcnt
(
SQInfo
*
pQInfo
)
{
if
(
pQInfo
!=
NULL
)
{
// assert(taosHashGetSize(pQInfo->groupInfo) >= 1);
}
...
...
@@ -2362,7 +2272,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
#endif
}
void
setTimestampRange
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int64_t
stime
,
int64_t
etime
)
{
UNUSED_FUNC
void
setTimestampRange
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int64_t
stime
,
int64_t
etime
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutputCols
;
++
i
)
{
...
...
@@ -2907,14 +2817,14 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int32_t
numOfGroups
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
);
while
(
pQInfo
->
subgroupId
x
<
numOfGroups
)
{
SArray
*
group
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
pQInfo
->
subgroupId
x
);
while
(
pQInfo
->
groupInde
x
<
numOfGroups
)
{
SArray
*
group
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
pQInfo
->
groupInde
x
);
ret
=
mergeIntoGroupResultImpl
(
pQInfo
,
group
);
if
(
ret
<
0
)
{
// not enough disk space to save the data into disk
return
-
1
;
}
pQInfo
->
subgroupId
x
+=
1
;
pQInfo
->
groupInde
x
+=
1
;
// this group generates at least one result, return results
if
(
ret
>
0
)
{
...
...
@@ -2922,11 +2832,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
}
assert
(
pQInfo
->
numOfGroupResultPages
==
0
);
qTrace
(
"QInfo:%p no result in group %d, continue"
,
pQInfo
,
pQInfo
->
subgroupId
x
-
1
);
qTrace
(
"QInfo:%p no result in group %d, continue"
,
pQInfo
,
pQInfo
->
groupInde
x
-
1
);
}
qTrace
(
"QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms"
,
pQInfo
,
pQInfo
->
subgroupId
x
-
1
,
numOfGroups
,
taosGetTimestampMs
()
-
st
);
pQInfo
,
pQInfo
->
groupInde
x
-
1
,
numOfGroups
,
taosGetTimestampMs
()
-
st
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2941,7 +2851,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
// set current query completed
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->
subgroupId
x == pQInfo->pSidSet->numOfSubSet) {
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->
groupInde
x == pQInfo->pSidSet->numOfSubSet) {
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
// return;
// }
...
...
@@ -2950,7 +2860,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SDiskbasedResultBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
int32_t
id
=
getGroupResultId
(
pQInfo
->
subgroupId
x
-
1
);
int32_t
id
=
getGroupResultId
(
pQInfo
->
groupInde
x
-
1
);
SIDList
list
=
getDataBufPagesIdList
(
pResultBuf
,
pQInfo
->
offset
+
id
);
int32_t
total
=
0
;
...
...
@@ -3156,7 +3066,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
r
=
capacity
;
}
int32_t
id
=
getGroupResultId
(
pQInfo
->
subgroupId
x
)
+
pQInfo
->
numOfGroupResultPages
;
int32_t
id
=
getGroupResultId
(
pQInfo
->
groupInde
x
)
+
pQInfo
->
numOfGroupResultPages
;
tFilePage
*
buf
=
getNewDataBuf
(
pResultBuf
,
id
,
&
pageId
);
// pagewise copy to dest buffer
...
...
@@ -3205,8 +3115,8 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
int32_t
functId
=
pQuery
->
pSelectExpr
[
j
].
pBase
.
functionId
;
if
(((
functId
==
TSDB_FUNC_FIRST
||
functId
==
TSDB_FUNC_FIRST_DST
)
&&
order
==
TSDB_ORDER_
DE
SC
)
||
((
functId
==
TSDB_FUNC_LAST
||
functId
==
TSDB_FUNC_LAST_DST
)
&&
order
==
TSDB_ORDER_
A
SC
))
{
if
(((
functId
==
TSDB_FUNC_FIRST
||
functId
==
TSDB_FUNC_FIRST_DST
)
&&
order
==
TSDB_ORDER_
A
SC
)
||
((
functId
==
TSDB_FUNC_LAST
||
functId
==
TSDB_FUNC_LAST_DST
)
&&
order
==
TSDB_ORDER_
DE
SC
))
{
buf
->
resultInfo
[
j
].
complete
=
false
;
}
else
if
(
functId
!=
TSDB_FUNC_TS
&&
functId
!=
TSDB_FUNC_TAG
)
{
buf
->
resultInfo
[
j
].
complete
=
true
;
...
...
@@ -3215,32 +3125,28 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
}
}
void
disableFunc
tForTableSuppleScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
order
)
{
void
disableFunc
InReverseScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
order
=
pQuery
->
order
.
order
;
// group by normal columns and interval query on normal table
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutputCols
;
++
i
)
{
pRuntimeEnv
->
pCtx
[
i
].
order
=
(
pRuntimeEnv
->
pCtx
[
i
].
order
)
^
1u
;
}
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
isIntervalQuery
(
pQuery
))
{
doDisableFunctsForSupplementaryScan
(
pQuery
,
pWindowResInfo
,
order
);
}
else
{
// for simple result of table query,
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
int32_t
functId
=
pQuery
->
pSelectExpr
[
j
].
pBase
.
functionId
;
int32_t
functId
=
pQuery
->
pSelectExpr
[
j
].
pBase
.
functionId
;
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
j
];
if
(((
functId
==
TSDB_FUNC_FIRST
||
functId
==
TSDB_FUNC_FIRST_DST
)
&&
order
==
TSDB_ORDER_
DE
SC
)
||
((
functId
==
TSDB_FUNC_LAST
||
functId
==
TSDB_FUNC_LAST_DST
)
&&
order
==
TSDB_ORDER_
A
SC
))
{
if
(((
functId
==
TSDB_FUNC_FIRST
||
functId
==
TSDB_FUNC_FIRST_DST
)
&&
order
==
TSDB_ORDER_
A
SC
)
||
((
functId
==
TSDB_FUNC_LAST
||
functId
==
TSDB_FUNC_LAST_DST
)
&&
order
==
TSDB_ORDER_
DE
SC
))
{
pCtx
->
resultInfo
->
complete
=
false
;
}
else
if
(
functId
!=
TSDB_FUNC_TS
&&
functId
!=
TSDB_FUNC_TAG
)
{
pCtx
->
resultInfo
->
complete
=
true
;
}
}
}
pQuery
->
order
.
order
=
pQuery
->
order
.
order
^
1u
;
}
void
disableFuncForReverseScan
(
SQInfo
*
pQInfo
,
int32_t
order
)
{
...
...
@@ -3266,14 +3172,11 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
pQuery
->
order
.
order
=
(
pQuery
->
order
.
order
)
^
1u
;
}
void
enableFuncForForwardScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
order
)
{
void
switchCtxOrder
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutputCols
;
++
i
)
{
pRuntimeEnv
->
pCtx
[
i
].
order
=
(
pRuntimeEnv
->
pCtx
[
i
].
order
)
^
1u
;
SWITCH_ORDER
(
pRuntimeEnv
->
pCtx
[
i
].
order
);
// = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC
;
}
pQuery
->
order
.
order
=
(
pQuery
->
order
.
order
)
^
1u
;
}
void
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
SPosInfo
*
posInfo
)
{
...
...
@@ -3387,70 +3290,6 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
typedef
struct
SQueryStatus
{
int8_t
overStatus
;
TSKEY
lastKey
;
STSCursor
cur
;
}
SQueryStatus
;
// todo refactor
static
void
queryStatusSave
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQueryStatus
*
pStatus
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
pStatus
->
overStatus
=
pQuery
->
status
;
pStatus
->
lastKey
=
pQuery
->
lastKey
;
pStatus
->
cur
=
tsBufGetCursor
(
pRuntimeEnv
->
pTSBuf
);
// save the cursor
if
(
pRuntimeEnv
->
pTSBuf
)
{
pRuntimeEnv
->
pTSBuf
->
cur
.
order
^=
1u
;
tsBufNextPos
(
pRuntimeEnv
->
pTSBuf
);
}
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
pQuery
->
lastKey
=
pQuery
->
window
.
skey
;
}
static
void
queryStatusRestore
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQueryStatus
*
pStatus
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
pQuery
->
lastKey
=
pStatus
->
lastKey
;
pQuery
->
status
=
pStatus
->
overStatus
;
tsBufSetCursor
(
pRuntimeEnv
->
pTSBuf
,
&
pStatus
->
cur
);
}
static
void
doSingleMeterSupplementScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQueryStatus
qStatus
=
{
0
};
if
(
!
needReverseScan
(
pQuery
))
{
return
;
}
qTrace
(
"QInfo:%p start to supp scan"
,
GET_QINFO_ADDR
(
pQuery
));
SET_SUPPLEMENT_SCAN_FLAG
(
pRuntimeEnv
);
// close necessary function execution during supplementary scan
disableFunctForTableSuppleScan
(
pRuntimeEnv
,
pQuery
->
order
.
order
);
queryStatusSave
(
pRuntimeEnv
,
&
qStatus
);
STimeWindow
w
=
{.
skey
=
pQuery
->
window
.
skey
,
.
ekey
=
pQuery
->
window
.
ekey
};
// reverse scan from current position
TsdbPosT
current
=
tsdbDataBlockTell
(
pRuntimeEnv
->
pQueryHandle
);
tsdbResetQuery
(
pRuntimeEnv
->
pQueryHandle
,
&
w
,
current
,
pQuery
->
order
.
order
);
doScanAllDataBlocks
(
pRuntimeEnv
);
queryStatusRestore
(
pRuntimeEnv
,
&
qStatus
);
enableFuncForForwardScan
(
pRuntimeEnv
,
pQuery
->
order
.
order
);
SET_MASTER_SCAN_FLAG
(
pRuntimeEnv
);
}
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
)
{
if
(
status
==
QUERY_NOT_COMPLETED
)
{
pQuery
->
status
=
status
;
...
...
@@ -3506,82 +3345,140 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
return
toContinue
;
}
static
SQueryStatusInfo
getQueryStatusInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQueryStatusInfo
info
=
{
.
status
=
pQuery
->
status
,
.
windowIndex
=
pRuntimeEnv
->
windowResInfo
.
curIndex
,
.
lastKey
=
pQuery
->
lastKey
,
.
w
=
pQuery
->
window
,
};
return
info
;
}
static
void
setEnvBeforeReverseScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQueryStatusInfo
*
pStatus
)
{
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// the step should be placed before order changed
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
pStatus
->
cur
=
tsBufGetCursor
(
pRuntimeEnv
->
pTSBuf
);
// save the cursor
if
(
pRuntimeEnv
->
pTSBuf
)
{
SWITCH_ORDER
(
pRuntimeEnv
->
pTSBuf
->
cur
.
order
);
tsBufNextPos
(
pRuntimeEnv
->
pTSBuf
);
}
// reverse order time range
pQuery
->
window
.
skey
=
pQuery
->
lastKey
-
step
;
pQuery
->
window
.
ekey
=
pStatus
->
lastKey
;
// the start timestamp of current query
SWITCH_ORDER
(
pQuery
->
order
.
order
);
SET_SUPPLEMENT_SCAN_FLAG
(
pRuntimeEnv
);
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
// clean unused handle
if
(
pRuntimeEnv
->
pSecQueryHandle
!=
NULL
)
{
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pSecQueryHandle
);
}
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
pQInfo
->
groupInfo
);
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
switchCtxOrder
(
pRuntimeEnv
);
disableFuncInReverseScan
(
pRuntimeEnv
);
}
static
void
clearEnvAfterReverseScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
TSKEY
lastKey
,
SQueryStatusInfo
*
pStatus
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SWITCH_ORDER
(
pQuery
->
order
.
order
);
switchCtxOrder
(
pRuntimeEnv
);
tsBufSetCursor
(
pRuntimeEnv
->
pTSBuf
,
&
pStatus
->
cur
);
if
(
pRuntimeEnv
->
pTSBuf
)
{
pRuntimeEnv
->
pTSBuf
->
cur
.
order
=
pQuery
->
order
.
order
;
}
SET_MASTER_SCAN_FLAG
(
pRuntimeEnv
);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query
// during reverse scan
pQuery
->
lastKey
=
lastKey
;
pQuery
->
status
=
pStatus
->
status
;
pQuery
->
window
=
pStatus
->
w
;
}
void
scanAllDataBlocks
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
// store the start query position
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pRuntimeEnv
);
int64_t
skey
=
pQuery
->
lastKey
;
int32_t
status
=
pQuery
->
status
;
int32_t
activeSlot
=
pRuntimeEnv
->
windowResInfo
.
curIndex
;
SQueryStatusInfo
qstatus
=
getQueryStatusInfo
(
pRuntimeEnv
);
SET_MASTER_SCAN_FLAG
(
pRuntimeEnv
);
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
while
(
1
)
{
doScanAllDataBlocks
(
pRuntimeEnv
);
if
(
pRuntimeEnv
->
scanFlag
==
MASTER_SCAN
)
{
qstatus
.
status
=
pQuery
->
status
;
}
if
(
!
needScanDataBlocksAgain
(
pRuntimeEnv
))
{
// restore the status
// restore the status code and jump out of loop
if
(
pRuntimeEnv
->
scanFlag
==
REPEAT_SCAN
)
{
pQuery
->
status
=
status
;
pQuery
->
status
=
qstatus
.
status
;
}
break
;
}
// set the correct start position, and load the corresponding block in buffer for next round scan all data blocks.
// /*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos);
STsdbQueryCond
cond
=
{
.
twindow
=
{
pQuery
->
window
.
skey
,
pQuery
->
lastKey
},
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
twindow
=
{.
skey
=
qstatus
.
lastKey
,
.
ekey
=
pQuery
->
lastKey
-
step
},
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
SArray
*
cols
=
taosArrayInit
(
pQuery
->
numOfCols
,
sizeof
(
pQuery
->
colList
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
}
if
(
pRuntimeEnv
->
pSecQueryHandle
!=
NULL
)
{
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
pQInfo
->
groupInfo
,
cols
);
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pSecQueryHandle
);
}
taosArrayDestroy
(
cols
);
status
=
pQuery
->
status
;
pRuntimeEnv
->
windowResInfo
.
curIndex
=
activeSlot
;
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
pQInfo
->
groupInfo
);
pRuntimeEnv
->
windowResInfo
.
curIndex
=
qstatus
.
windowIndex
;
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
pRuntimeEnv
->
scanFlag
=
REPEAT_SCAN
;
// check if query is killed or not
if
(
isQueryKilled
(
GET_QINFO_ADDR
(
pRuntimeEnv
)
))
{
if
(
isQueryKilled
(
pQInfo
))
{
return
;
}
}
if
(
!
needReverseScan
(
pQuery
))
{
return
;
}
TSKEY
lastKey
=
pQuery
->
lastKey
;
setEnvBeforeReverseScan
(
pRuntimeEnv
,
&
qstatus
);
// no need to set the end key
TSKEY
lkey
=
pQuery
->
lastKey
;
TSKEY
ekey
=
pQuery
->
window
.
ekey
;
pQuery
->
window
.
skey
=
skey
;
pQuery
->
window
.
ekey
=
pQuery
->
lastKey
-
step
;
/*tsdbpos_t current =*/
tsdbDataBlockTell
(
pRuntimeEnv
->
pQueryHandle
);
doSingleMeterSupplementScan
(
pRuntimeEnv
);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan
pQuery
->
lastKey
=
lkey
;
pQuery
->
window
.
ekey
=
ekey
;
// STimeWindow win = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &win, current, pQuery->order.order);
// tsdbNextDataBlock(pRuntimeEnv->pQueryHandle);
// reverse scan from current position
qTrace
(
"QInfo:%p start to reverse scan"
,
GET_QINFO_ADDR
(
pRuntimeEnv
));
doScanAllDataBlocks
(
pRuntimeEnv
);
clearEnvAfterReverseScan
(
pRuntimeEnv
,
lastKey
,
&
qstatus
);
}
void
finalizeQueryResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
...
...
@@ -3875,17 +3772,17 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
int32_t
totalSubset
=
getNumOfSubset
(
pQInfo
);
if
(
orderType
==
TSDB_ORDER_ASC
)
{
startIdx
=
pQInfo
->
subgroupId
x
;
startIdx
=
pQInfo
->
groupInde
x
;
step
=
1
;
}
else
{
// desc order copy all data
startIdx
=
totalSubset
-
pQInfo
->
subgroupId
x
-
1
;
startIdx
=
totalSubset
-
pQInfo
->
groupInde
x
-
1
;
step
=
-
1
;
}
for
(
int32_t
i
=
startIdx
;
(
i
<
totalSubset
)
&&
(
i
>=
0
);
i
+=
step
)
{
if
(
result
[
i
].
numOfRows
==
0
)
{
pQInfo
->
offset
=
0
;
pQInfo
->
subgroupId
x
+=
1
;
pQInfo
->
groupInde
x
+=
1
;
continue
;
}
...
...
@@ -3903,7 +3800,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
pQInfo
->
offset
+=
numOfRowsToCopy
;
}
else
{
pQInfo
->
offset
=
0
;
pQInfo
->
subgroupId
x
+=
1
;
pQInfo
->
groupInde
x
+=
1
;
}
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
...
...
@@ -4174,18 +4071,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
pQuery
->
lastKey
=
pQuery
->
window
.
skey
;
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
twindow
=
pQuery
->
window
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
SArray
*
cols
=
taosArrayInit
(
pQuery
->
numOfCols
,
sizeof
(
pQuery
->
colList
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
}
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQInfo
->
groupInfo
,
cols
);
taosArrayDestroy
(
cols
);
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQInfo
->
groupInfo
);
pQInfo
->
tsdb
=
tsdb
;
pRuntimeEnv
->
pQuery
=
pQuery
;
...
...
@@ -4403,25 +4295,19 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
.
twindow
=
{
pInfo
->
pTableQInfo
->
lastKey
,
pInfo
->
pTableQInfo
->
win
.
ekey
},
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
SArray
*
cols
=
taosArrayInit
(
pQuery
->
numOfCols
,
sizeof
(
pQuery
->
colList
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
}
SArray
*
g1
=
taosArrayInit
(
1
,
POINTER_BYTES
);
STableGroupInfo
gp
=
{.
numOfTables
=
1
,
.
pGroupList
=
g1
};
SArray
*
tx
=
taosArrayInit
(
1
,
sizeof
(
SPair
));
taosArrayPush
(
tx
,
p
);
taosArrayPush
(
tx
,
p
);
taosArrayPush
(
g1
,
&
tx
);
STableGroupInfo
gp
=
{.
numOfTables
=
1
,
.
pGroupList
=
g1
};
// include only current table
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
,
cols
);
// vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj);
// vnodeUpdateFilterColumnIndex(pQuery);
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
);
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
if
(
pRuntimeEnv
->
cur
.
vnodeIndex
==
-
1
)
{
...
...
@@ -4501,14 +4387,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
assert
(
pQuery
->
limit
.
offset
==
0
&&
pQuery
->
limit
.
limit
!=
0
);
#if 0
while (pQInfo->
subgroupId
x < numOfGroups) {
while (pQInfo->
groupInde
x < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->
subgroupId
x);
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->
groupInde
x);
size_t numOfTable = taosArrayGetSize(group);
if (isFirstLastRowQuery(pQuery)) {
qTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pQInfo->
subgroupId
x);
pQInfo->
groupInde
x);
TSKEY key = -1;
int32_t index = -1;
...
...
@@ -4529,7 +4415,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// assert(num >= 0);
} else {
qTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pQInfo->
subgroupId
x);
pQInfo->
groupInde
x);
for (int32_t k = start; k <= end; ++k) {
if (isQueryKilled(pQInfo)) {
...
...
@@ -4547,7 +4433,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
}
pSupporter->
subgroupId
x++;
pSupporter->
groupInde
x++;
// output buffer is full, return to client
if (pQuery->size >= pQuery->pointsToRead) {
...
...
@@ -4564,7 +4450,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place.
*/
if
(
pQInfo
->
subgroupId
x
>
0
)
{
if
(
pQInfo
->
groupInde
x
>
0
)
{
copyFromWindowResToSData
(
pQInfo
,
pRuntimeEnv
->
windowResInfo
.
pResult
);
pQuery
->
rec
.
total
+=
pQuery
->
rec
.
rows
;
...
...
@@ -4585,13 +4471,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
assert
(
taosArrayGetSize
(
group
)
==
pQInfo
->
groupInfo
.
numOfTables
&&
1
==
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
));
while
(
pQInfo
->
tableIndex
<
pQInfo
->
groupInfo
.
numOfTables
)
{
int32_t
k
=
pQInfo
->
tableIndex
;
if
(
isQueryKilled
(
pQInfo
))
{
return
;
}
SPair
*
p
=
taosArrayGet
(
group
,
k
);
SPair
*
p
=
taosArrayGet
(
group
,
pQInfo
->
tableIndex
);
STableDataInfo
*
pInfo
=
p
->
sec
;
TSKEY
skey
=
pInfo
->
pTableQInfo
->
lastKey
;
...
...
@@ -4599,7 +4483,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery
->
window
.
skey
=
skey
;
}
if
(
!
multiTableMultioutputHelper
(
pQInfo
,
k
))
{
if
(
!
multiTableMultioutputHelper
(
pQInfo
,
pQInfo
->
tableIndex
))
{
pQInfo
->
tableIndex
++
;
continue
;
}
...
...
@@ -4696,7 +4580,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
}
pQInfo
->
subgroupId
x
=
0
;
pQInfo
->
groupInde
x
=
0
;
pQuery
->
rec
.
rows
=
0
;
copyFromWindowResToSData
(
pQInfo
,
pWindowResInfo
->
pResult
);
}
...
...
@@ -4773,7 +4657,7 @@ static void doRestoreContext(SQInfo* pQInfo) {
pRuntimeEnv
->
pTSBuf
->
cur
.
order
=
pRuntimeEnv
->
pTSBuf
->
cur
.
order
^
1
;
}
enableFuncForForwardScan
(
pRuntimeEnv
,
pQuery
->
order
.
order
);
switchCtxOrder
(
pRuntimeEnv
);
SET_MASTER_SCAN_FLAG
(
pRuntimeEnv
);
}
...
...
@@ -4806,9 +4690,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
pQInfo
->
subgroupId
x
>
0
)
{
if
(
pQInfo
->
groupInde
x
>
0
)
{
/*
* if the
subgroupId
x > 0, the query process must be completed yet, we only need to
* if the
groupInde
x > 0, the query process must be completed yet, we only need to
* copy the data into output buffer
*/
if
(
isIntervalQuery
(
pQuery
))
{
...
...
@@ -4870,7 +4754,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
}
if
(
isIntervalQuery
(
pQuery
)
||
isSumAvgRateQuery
(
pQuery
))
{
// assert(pSupporter->
subgroupId
x == 0 && pSupporter->numOfGroupResultPages == 0);
// assert(pSupporter->
groupInde
x == 0 && pSupporter->numOfGroupResultPages == 0);
if
(
mergeIntoGroupResult
(
pQInfo
)
==
TSDB_CODE_SUCCESS
)
{
copyResToQueryResultBuf
(
pQInfo
,
pQuery
);
...
...
@@ -5008,11 +4892,11 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
tableIntervalProcessImpl
(
pRuntimeEnv
);
if
(
isIntervalQuery
(
pQuery
))
{
pQInfo
->
subgroupId
x
=
0
;
// always start from 0
pQInfo
->
groupInde
x
=
0
;
// always start from 0
pQuery
->
rec
.
rows
=
0
;
copyFromWindowResToSData
(
pQInfo
,
pRuntimeEnv
->
windowResInfo
.
pResult
);
clearFirstNTimeWindow
(
pRuntimeEnv
,
pQInfo
->
subgroupId
x
);
clearFirstNTimeWindow
(
pRuntimeEnv
,
pQInfo
->
groupInde
x
);
}
// the offset is handled at prepare stage if no interpolation involved
...
...
@@ -5044,10 +4928,10 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
// all data scanned, the group by normal column can return
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
// todo refactor with merge interval time result
pQInfo
->
subgroupId
x
=
0
;
pQInfo
->
groupInde
x
=
0
;
pQuery
->
rec
.
rows
=
0
;
copyFromWindowResToSData
(
pQInfo
,
pRuntimeEnv
->
windowResInfo
.
pResult
);
clearFirstNTimeWindow
(
pRuntimeEnv
,
pQInfo
->
subgroupId
x
);
clearFirstNTimeWindow
(
pRuntimeEnv
,
pQInfo
->
groupInde
x
);
}
pQInfo
->
pointsInterpo
+=
numOfInterpo
;
...
...
@@ -5083,13 +4967,13 @@ static void tableQueryImpl(SQInfo* pQInfo) {
// todo limit the output for interval query?
pQuery
->
rec
.
rows
=
0
;
pQInfo
->
subgroupId
x
=
0
;
// always start from 0
pQInfo
->
groupInde
x
=
0
;
// always start from 0
if
(
pRuntimeEnv
->
windowResInfo
.
size
>
0
)
{
copyFromWindowResToSData
(
pQInfo
,
pRuntimeEnv
->
windowResInfo
.
pResult
);
pQuery
->
rec
.
rows
+=
pQuery
->
rec
.
rows
;
clearFirstNTimeWindow
(
pRuntimeEnv
,
pQInfo
->
subgroupId
x
);
clearFirstNTimeWindow
(
pRuntimeEnv
,
pQInfo
->
groupInde
x
);
if
(
pQuery
->
rec
.
rows
>
0
)
{
qTrace
(
"QInfo:%p %d rows returned from group results, total:%d"
,
pQInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
);
...
...
@@ -5895,13 +5779,6 @@ static void freeQInfo(SQInfo *pQInfo) {
sem_destroy
(
&
(
pQInfo
->
dataReady
));
teardownQueryRuntimeEnv
(
&
pQInfo
->
runtimeEnv
);
// if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->groupInfo);
// for (int32_t j = 0; j < 0; ++j) {
// destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
// }
// }
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfFilterCols
;
++
i
)
{
SSingleColumnFilterInfo
*
pColFilter
=
&
pQuery
->
pFilterInfo
[
i
];
if
(
pColFilter
->
numOfFilters
>
0
)
{
...
...
@@ -5933,6 +5810,12 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree
(
pQuery
->
pGroupbyExpr
);
tfree
(
pQuery
);
int32_t
numOfGroups
=
taosArrayGetSize
(
pQInfo
->
groupInfo
.
pGroupList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
p
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
i
);
taosArrayDestroy
(
p
);
}
taosArrayDestroy
(
pQInfo
->
groupInfo
.
pGroupList
);
qTrace
(
"QInfo:%p QInfo is freed"
,
pQInfo
);
...
...
@@ -6036,7 +5919,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
}
bool
isSTableQuery
=
false
;
STableGroupInfo
*
groupInfo
=
calloc
(
1
,
sizeof
(
STableGroupInfo
))
;
STableGroupInfo
groupInfo
=
{
0
}
;
if
((
pQueryMsg
->
queryType
&
TSDB_QUERY_TYPE_STABLE_QUERY
)
!=
0
)
{
isSTableQuery
=
true
;
...
...
@@ -6044,8 +5927,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
STableId
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
id
->
uid
=
-
1
;
//todo fix me
/*int32_t ret =*/
tsdbQuery
Tags
(
tsdb
,
id
->
uid
,
tagCond
,
pQueryMsg
->
tagCondLen
,
groupInfo
,
pGroupColIndex
,
pQueryMsg
->
numOfGroupCols
);
if
(
groupInfo
->
numOfTables
==
0
)
{
// no qualified tables no need to do query
/*int32_t ret =*/
tsdbQuery
ByTagsCond
(
tsdb
,
id
->
uid
,
tagCond
,
pQueryMsg
->
tagCondLen
,
&
groupInfo
,
pGroupColIndex
,
pQueryMsg
->
numOfGroupCols
);
if
(
groupInfo
.
numOfTables
==
0
)
{
// no qualified tables no need to do query
code
=
TSDB_CODE_SUCCESS
;
goto
_query_over
;
}
...
...
@@ -6053,12 +5936,12 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
assert
(
taosArrayGetSize
(
pTableIdList
)
==
1
);
STableId
*
id
=
taosArrayGet
(
pTableIdList
,
0
);
if
((
code
=
tsdbGetOneTableGroup
(
tsdb
,
id
->
uid
,
groupInfo
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
tsdbGetOneTableGroup
(
tsdb
,
id
->
uid
,
&
groupInfo
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_query_over
;
}
}
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
pGroupbyExpr
,
pExprs
,
groupInfo
);
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
pGroupbyExpr
,
pExprs
,
&
groupInfo
);
if
((
*
pQInfo
)
==
NULL
)
{
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
...
...
@@ -6066,24 +5949,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
code
=
initQInfo
(
pQueryMsg
,
tsdb
,
*
pQInfo
,
isSTableQuery
);
_query_over:
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
pTableIdList
);
}
taosArrayDestroy
(
pTableIdList
);
// if failed to add ref for all meters in this query, abort current query
// if (code != TSDB_CODE_SUCCESS) {
// vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber);
// }
//
// tfree(pQueryMsg->pSqlFuncExprs);
// tfree(pMeterObjList);
// ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle);
//
// tfree(pQueryMsg->pSidExtInfo);
// for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) {
// vnodeFreeColumnInfo(&pQueryMsg->colList[i]);
// }
//
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
return
TSDB_CODE_SUCCESS
;
}
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
b4514c86
...
...
@@ -134,7 +134,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo
->
fileListIndex
=
-
1
;
}
TsdbQueryHandleT
*
tsdbQueryTables
(
TsdbRepoT
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
SArray
*
pColumnInfo
)
{
TsdbQueryHandleT
*
tsdbQueryTables
(
TsdbRepoT
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
)
{
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
...
...
@@ -148,7 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
pQueryHandle
->
cur
.
fid
=
-
1
;
size_t
sizeOfGroup
=
taosArrayGetSize
(
groupList
->
pGroupList
);
assert
(
sizeOfGroup
>=
1
);
assert
(
sizeOfGroup
>=
1
&&
pCond
!=
NULL
&&
pCond
->
numOfCols
>
0
);
pQueryHandle
->
pTableCheckInfo
=
taosArrayInit
(
groupList
->
numOfTables
,
sizeof
(
STableCheckInfo
));
...
...
@@ -182,16 +182,15 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
pQueryHandle
->
activeIndex
=
0
;
// allocate buffer in order to load data blocks from file
int32_t
numOfCols
=
taosArrayGetSize
(
pColumnInfo
)
;
int32_t
numOfCols
=
pCond
->
numOfCols
;
size_t
bufferCapacity
=
4096
;
pQueryHandle
->
pColumns
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pColumnInfo
,
i
);
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfCols
;
++
i
)
{
SColumnInfoData
pDest
=
{{
0
},
0
};
pDest
.
pData
=
calloc
(
1
,
EXTRA_BYTES
+
bufferCapacity
*
pCol
->
info
.
bytes
)
;
pDest
.
info
=
pCol
->
info
;
pDest
.
info
=
pCond
->
colList
[
i
].
info
;
pDest
.
pData
=
calloc
(
1
,
EXTRA_BYTES
+
bufferCapacity
*
pCond
->
colList
[
i
].
info
.
bytes
)
;
taosArrayPush
(
pQueryHandle
->
pColumns
,
&
pDest
);
}
...
...
@@ -430,9 +429,6 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
taosArrayDestroy
(
sa
);
tfree
(
data
);
// TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
// assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
return
blockLoaded
;
}
...
...
@@ -587,7 +583,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
}
}
//
int32_t start = MIN(cur->pos, endPos);
int32_t
start
=
MIN
(
cur
->
pos
,
endPos
);
// move the data block in the front to data block if needed
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
...
...
@@ -600,9 +596,9 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
if
(
pCol
->
info
.
colId
==
colId
)
{
// SDataCol* pDataCol = &pCols->cols[i];
pCol
->
pData
=
pQueryHandle
->
rhelper
.
pDataCols
[
0
]
->
cols
[
i
].
pData
;
// memmove(pCol->pData, pDataCol->
pData + pCol->info.bytes * start,
//
pQueryHandle->realNumOfRows * pCol->info.bytes);
// pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData + pCol->info.bytes * start
;
memmove
(
pCol
->
pData
,
pQueryHandle
->
rhelper
.
pDataCols
[
0
]
->
cols
[
i
].
pData
+
pCol
->
info
.
bytes
*
start
,
pQueryHandle
->
realNumOfRows
*
pCol
->
info
.
bytes
);
break
;
}
}
...
...
@@ -941,7 +937,6 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) {
return
getDataBlocksInFiles
(
pQueryHandle
);
}
}
static
int
tsdbReadRowsFromCache
(
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
TSKEY
*
skey
,
TSKEY
*
ekey
,
...
...
@@ -1053,7 +1048,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
if
(
pTable
->
mem
!=
NULL
)
{
// create mem table iterator if it is not created yet
assert
(
pCheckInfo
->
iter
!=
NULL
);
rows
=
tsdbReadRowsFromCache
(
pCheckInfo
->
iter
,
pHandle
->
window
.
ekey
,
2
,
&
skey
,
&
ekey
,
pHandle
);
rows
=
tsdbReadRowsFromCache
(
pCheckInfo
->
iter
,
pHandle
->
window
.
ekey
,
4000
,
&
skey
,
&
ekey
,
pHandle
);
// update the last key value
pCheckInfo
->
lastKey
=
ekey
+
step
;
...
...
@@ -1117,10 +1112,6 @@ int32_t tsdbResetQuery(TsdbQueryHandleT* pQueryHandle, STimeWindow* window, Tsdb
return
0
;
}
int32_t
tsdbDataBlockSeek
(
TsdbQueryHandleT
*
pQueryHandle
,
TsdbPosT
pos
)
{
return
0
;
}
TsdbPosT
tsdbDataBlockTell
(
TsdbQueryHandleT
*
pQueryHandle
)
{
return
NULL
;
}
SArray
*
tsdbRetrieveDataRow
(
TsdbQueryHandleT
*
pQueryHandle
,
SArray
*
pIdList
,
SQueryRowCond
*
pCond
)
{
return
NULL
;
}
TsdbQueryHandleT
*
tsdbQueryFromTagConds
(
STsdbQueryCond
*
pCond
,
int16_t
stableId
,
const
char
*
pTagFilterStr
)
{
...
...
@@ -1266,12 +1257,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
SColIndex
*
pColIndex
=
&
pTableGroupSupp
->
pCols
[
i
];
int32_t
colIndex
=
pColIndex
->
colIndex
;
assert
(
colIndex
>=
0
&&
colIndex
<
schemaNCols
(
pTableGroupSupp
->
pTagSchema
));
char
*
f1
=
NULL
;
char
*
f2
=
NULL
;
int32_t
type
=
0
;
int32_t
bytes
=
0
;
if
(
colIndex
==
-
1
)
{
// t
able name, todo fix
me
if
(
colIndex
==
-
1
)
{
// t
odo fix me, table na
me
// f1 = s1->tags;
// f2 = s2->tags;
type
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -1438,7 +1431,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbQuery
Tags
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
STableGroupInfo
*
pGroupInfo
,
int32_t
tsdbQuery
ByTagsCond
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
)
{
STable
*
pSTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
...
...
@@ -1520,11 +1513,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
taosArrayDestroy
(
pQueryHandle
->
pTableCheckInfo
);
tfree
(
pQueryHandle
->
compIndex
);
//
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
//
for (int32_t i = 0; i < cols; ++i) {
//
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
// //
tfree(pColInfo->pData);
//
}
size_t
cols
=
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
for
(
int32_t
i
=
0
;
i
<
cols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
tfree
(
pColInfo
->
pData
);
}
taosArrayDestroy
(
pQueryHandle
->
pColumns
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录