Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
1d5bab8d
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1d5bab8d
编写于
3月 21, 2020
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-32] refactor codes, change the prepare qinfo functions
上级
f19c9742
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
1701 addition
and
968 deletion
+1701
-968
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+1
-1
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+4
-3
src/client/src/tscServer.c
src/client/src/tscServer.c
+14
-17
src/client/src/tscSub.c
src/client/src/tscSub.c
+1
-1
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+2
-2
src/dnode/src/dnodeRead.c
src/dnode/src/dnodeRead.c
+11
-4
src/inc/taosmsg.h
src/inc/taosmsg.h
+4
-8
src/query/inc/qextbuffer.h
src/query/inc/qextbuffer.h
+1
-1
src/query/inc/queryExecutor.h
src/query/inc/queryExecutor.h
+2
-2
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+1661
-929
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
1d5bab8d
...
...
@@ -85,7 +85,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
STableDataBlocks
**
dataBlocks
);
SVnodeSidList
*
tscGetVnodeSidList
(
SSuperTableMeta
*
pMetricmeta
,
int32_t
vnodeIdx
);
STable
SidExt
Info
*
tscGetMeterSidInfo
(
SVnodeSidList
*
pSidList
,
int32_t
idx
);
STable
Id
Info
*
tscGetMeterSidInfo
(
SVnodeSidList
*
pSidList
,
int32_t
idx
);
/**
*
...
...
src/client/src/tscLocal.c
浏览文件 @
1d5bab8d
...
...
@@ -318,15 +318,16 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
SVnodeSidList
*
pSidList
=
(
SVnodeSidList
*
)((
char
*
)
pMetricMeta
+
pMetricMeta
->
list
[
i
]);
for
(
int32_t
j
=
0
;
j
<
pSidList
->
numOfSids
;
++
j
)
{
STable
SidExt
Info
*
pSidExt
=
tscGetMeterSidInfo
(
pSidList
,
j
);
STable
Id
Info
*
pSidExt
=
tscGetMeterSidInfo
(
pSidList
,
j
);
for
(
int32_t
k
=
0
;
k
<
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
k
)
{
SColIndexEx
*
pColIndex
=
&
tscSqlExprGet
(
pQueryInfo
,
k
)
->
colInfo
;
int16_t
offsetId
=
pColIndex
->
colIdx
;
assert
((
pColIndex
->
flag
&
TSDB_COL_TAG
)
!=
0
);
assert
(
0
);
char
*
val
=
pSidExt
->
tags
+
vOffset
[
offsetId
];
char
*
val
=
NULL
;
//
pSidExt->tags + vOffset[offsetId];
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
pQueryInfo
,
k
);
memcpy
(
pRes
->
data
+
tscFieldInfoGetOffset
(
pQueryInfo
,
k
)
*
totalNumOfResults
+
pField
->
bytes
*
rowIdx
,
val
,
...
...
src/client/src/tscServer.c
浏览文件 @
1d5bab8d
...
...
@@ -598,7 +598,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
SSuperTableMeta
*
pMetricMeta
=
pTableMetaInfo
->
pMetricMeta
;
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
pTableMetaInfo
->
vnodeIndex
);
int32_t
meterInfoSize
=
(
pMetricMeta
->
tagLen
+
sizeof
(
STable
SidExt
Info
))
*
pVnodeSidList
->
numOfSids
;
int32_t
meterInfoSize
=
(
pMetricMeta
->
tagLen
+
sizeof
(
STable
Id
Info
))
*
pVnodeSidList
->
numOfSids
;
int32_t
outputColumnSize
=
pQueryInfo
->
exprsInfo
.
numOfExprs
*
sizeof
(
SSqlFuncExprMsg
);
int32_t
size
=
meterInfoSize
+
outputColumnSize
+
srcColListSize
+
exprSize
+
MIN_QUERY_MSG_PKT_SIZE
;
...
...
@@ -620,26 +620,23 @@ static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vn
#ifdef _DEBUG_VIEW
tscTrace
(
"%p sid:%d, uid:%"
PRIu64
,
pSql
,
pTableMetaInfo
->
pTableMeta
->
sid
,
pTableMetaInfo
->
pTableMeta
->
uid
);
#endif
STable
SidExtInfo
*
pTableMetaInfo
=
(
STableSidExt
Info
*
)
pMsg
;
STable
IdInfo
*
pTableMetaInfo
=
(
STableId
Info
*
)
pMsg
;
pTableMetaInfo
->
sid
=
htonl
(
pTableMeta
->
sid
);
pTableMetaInfo
->
uid
=
htobe64
(
pTableMeta
->
uid
);
pTableMetaInfo
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
->
pSubscription
,
pTableMeta
->
uid
));
pMsg
+=
sizeof
(
STable
SidExt
Info
);
pMsg
+=
sizeof
(
STable
Id
Info
);
}
else
{
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
pTableMetaInfo
->
vnodeIndex
);
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STable
SidExtInfo
*
pTableMetaInfo
=
(
STableSidExt
Info
*
)
pMsg
;
STable
SidExt
Info
*
pQueryMeterInfo
=
tscGetMeterSidInfo
(
pVnodeSidList
,
i
);
STable
IdInfo
*
pTableIdInfo
=
(
STableId
Info
*
)
pMsg
;
STable
Id
Info
*
pQueryMeterInfo
=
tscGetMeterSidInfo
(
pVnodeSidList
,
i
);
pTable
Meta
Info
->
sid
=
htonl
(
pQueryMeterInfo
->
sid
);
pTable
Meta
Info
->
uid
=
htobe64
(
pQueryMeterInfo
->
uid
);
pTable
Meta
Info
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
->
pSubscription
,
pQueryMeterInfo
->
uid
));
pTable
Id
Info
->
sid
=
htonl
(
pQueryMeterInfo
->
sid
);
pTable
Id
Info
->
uid
=
htobe64
(
pQueryMeterInfo
->
uid
);
pTable
Id
Info
->
key
=
htobe64
(
tscGetSubscriptionProgress
(
pSql
->
pSubscription
,
pQueryMeterInfo
->
uid
));
pMsg
+=
sizeof
(
STableSidExtInfo
);
memcpy
(
pMsg
,
pQueryMeterInfo
->
tags
,
pMetricMeta
->
tagLen
);
pMsg
+=
pMetricMeta
->
tagLen
;
pMsg
+=
sizeof
(
STableIdInfo
);
#ifdef _DEBUG_VIEW
tscTrace
(
"%p sid:%d, uid:%"
PRId64
,
pSql
,
pQueryMeterInfo
->
sid
,
pQueryMeterInfo
->
uid
);
...
...
@@ -2067,7 +2064,7 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
pMeta
->
numOfVnodes
=
htonl
(
pMeta
->
numOfVnodes
);
pMeta
->
tagLen
=
htons
(
pMeta
->
tagLen
);
size
+=
pMeta
->
numOfVnodes
*
sizeof
(
SVnodeSidList
*
)
+
pMeta
->
numOfTables
*
sizeof
(
STable
SidExt
Info
*
);
size
+=
pMeta
->
numOfVnodes
*
sizeof
(
SVnodeSidList
*
)
+
pMeta
->
numOfTables
*
sizeof
(
STable
Id
Info
*
);
char
*
pBuf
=
calloc
(
1
,
size
);
if
(
pBuf
==
NULL
)
{
...
...
@@ -2093,16 +2090,16 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
tscTrace
(
"%p metricmeta:vid:%d,numOfTables:%d"
,
pSql
,
i
,
pLists
->
numOfSids
);
pBuf
+=
sizeof
(
SVnodeSidList
)
+
sizeof
(
STable
SidExt
Info
*
)
*
pSidLists
->
numOfSids
;
pBuf
+=
sizeof
(
SVnodeSidList
)
+
sizeof
(
STable
Id
Info
*
)
*
pSidLists
->
numOfSids
;
rsp
+=
sizeof
(
SVnodeSidList
);
size_t
elemSize
=
sizeof
(
STable
SidExt
Info
)
+
pNewMetricMeta
->
tagLen
;
size_t
elemSize
=
sizeof
(
STable
Id
Info
)
+
pNewMetricMeta
->
tagLen
;
for
(
int32_t
j
=
0
;
j
<
pSidLists
->
numOfSids
;
++
j
)
{
pLists
->
pSidExtInfoList
[
j
]
=
pBuf
-
(
char
*
)
pLists
;
memcpy
(
pBuf
,
rsp
,
elemSize
);
((
STable
SidExtInfo
*
)
pBuf
)
->
uid
=
htobe64
(((
STableSidExt
Info
*
)
pBuf
)
->
uid
);
((
STable
SidExtInfo
*
)
pBuf
)
->
sid
=
htonl
(((
STableSidExt
Info
*
)
pBuf
)
->
sid
);
((
STable
IdInfo
*
)
pBuf
)
->
uid
=
htobe64
(((
STableId
Info
*
)
pBuf
)
->
uid
);
((
STable
IdInfo
*
)
pBuf
)
->
sid
=
htonl
(((
STableId
Info
*
)
pBuf
)
->
sid
);
rsp
+=
elemSize
;
pBuf
+=
elemSize
;
...
...
src/client/src/tscSub.c
浏览文件 @
1d5bab8d
...
...
@@ -202,7 +202,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
for
(
int32_t
i
=
0
;
i
<
pMetricMeta
->
numOfVnodes
;
i
++
)
{
SVnodeSidList
*
pVnodeSidList
=
tscGetVnodeSidList
(
pMetricMeta
,
i
);
for
(
int32_t
j
=
0
;
j
<
pVnodeSidList
->
numOfSids
;
j
++
)
{
STable
SidExt
Info
*
pTableMetaInfo
=
tscGetMeterSidInfo
(
pVnodeSidList
,
j
);
STable
Id
Info
*
pTableMetaInfo
=
tscGetMeterSidInfo
(
pVnodeSidList
,
j
);
int64_t
uid
=
pTableMetaInfo
->
uid
;
progress
[
numOfTables
].
uid
=
uid
;
progress
[
numOfTables
++
].
key
=
tscGetSubscriptionProgress
(
pSub
,
uid
);
...
...
src/client/src/tscUtil.c
浏览文件 @
1d5bab8d
...
...
@@ -191,7 +191,7 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx
return
(
SVnodeSidList
*
)(
pMetricmeta
->
list
[
vnodeIdx
]
+
(
char
*
)
pMetricmeta
);
}
STable
SidExt
Info
*
tscGetMeterSidInfo
(
SVnodeSidList
*
pSidList
,
int32_t
idx
)
{
STable
Id
Info
*
tscGetMeterSidInfo
(
SVnodeSidList
*
pSidList
,
int32_t
idx
)
{
if
(
pSidList
==
NULL
)
{
tscError
(
"illegal sidlist"
);
return
0
;
...
...
@@ -206,7 +206,7 @@ STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
assert
(
pSidList
->
pSidExtInfoList
[
idx
]
>=
0
);
return
(
STable
SidExt
Info
*
)(
pSidList
->
pSidExtInfoList
[
idx
]
+
(
char
*
)
pSidList
);
return
(
STable
Id
Info
*
)(
pSidList
->
pSidExtInfoList
[
idx
]
+
(
char
*
)
pSidList
);
}
bool
tscIsTwoStageSTableQuery
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
...
...
src/dnode/src/dnodeRead.c
浏览文件 @
1d5bab8d
...
...
@@ -15,13 +15,16 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "dnodeRead.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "queryExecutor.h"
typedef
struct
{
int32_t
code
;
...
...
@@ -88,7 +91,7 @@ void dnodeRead(SRpcMsg *pMsg) {
while
(
leftLen
>
0
)
{
SMsgHead
*
pHead
=
(
SMsgHead
*
)
pCont
;
pHead
->
vgId
=
1
;
//htonl(pHead->vgId);
pHead
->
vgId
=
1
;
//htonl(pHead->vgId);
pHead
->
contLen
=
pMsg
->
contLen
;
//htonl(pHead->contLen);
void
*
pVnode
=
dnodeGetVnode
(
pHead
->
vgId
);
...
...
@@ -223,8 +226,12 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
}
static
void
dnodeProcessQueryMsg
(
SReadMsg
*
pMsg
)
{
void
*
pQInfo
=
(
void
*
)
100
;
dTrace
(
"query msg is disposed, qInfo:%p"
,
pQInfo
);
SQueryTableMsg
*
pQueryTableMsg
=
(
SQueryTableMsg
*
)
pMsg
->
pCont
;
SQInfo
*
pQInfo
=
NULL
;
int32_t
ret
=
qCreateQueryInfo
(
pQueryTableMsg
,
&
pQInfo
);
dTrace
(
"query msg is disposed, qInfo:%p"
,
pQueryTableMsg
);
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
0
;
...
...
src/inc/taosmsg.h
浏览文件 @
1d5bab8d
...
...
@@ -434,15 +434,11 @@ typedef struct SColumnInfo {
SColumnFilterInfo
*
filters
;
}
SColumnInfo
;
/*
* enable vnode to understand how to group several tables with different tag;
*/
typedef
struct
STableSidExtInfo
{
typedef
struct
STableIdInfo
{
int32_t
sid
;
int64_t
uid
;
TSKEY
key
;
// key for subscription
char
tags
[];
}
STableSidExtInfo
;
TSKEY
key
;
// last accessed ts, for subscription
}
STableIdInfo
;
typedef
struct
STimeWindow
{
TSKEY
skey
;
...
...
@@ -670,7 +666,7 @@ typedef struct {
SVnodeDesc
vpeerDesc
[
TSDB_VNODES_SUPPORT
];
int16_t
index
;
// used locally
int32_t
numOfSids
;
int32_t
pSidExtInfoList
[];
// offset value of STable
SidExt
Info
int32_t
pSidExtInfoList
[];
// offset value of STable
Id
Info
}
SVnodeSidList
;
typedef
struct
{
...
...
src/query/inc/qextbuffer.h
浏览文件 @
1d5bab8d
...
...
@@ -124,7 +124,7 @@ typedef struct tTagSchema {
typedef
struct
tSidSet
{
int32_t
numOfSids
;
int32_t
numOfSubSet
;
STable
SidExt
Info
**
pSids
;
STable
Id
Info
**
pSids
;
int32_t
*
starterPos
;
// position of each subgroup, generated according to
SColumnModel
*
pColumnModel
;
...
...
src/query/inc/queryExecutor.h
浏览文件 @
1d5bab8d
...
...
@@ -152,7 +152,7 @@ typedef struct SQueryCostSummary {
typedef
struct
SQueryRuntimeEnv
{
SResultInfo
*
resultInfo
;
// todo refactor to merge with SWindowResInfo
SQuery
*
pQuery
;
void
*
pTabObj
;
//
void* pTabObj;
SData
**
pInterpoBuf
;
SQLFunctionCtx
*
pCtx
;
int16_t
numOfRowsPerPage
;
...
...
@@ -204,7 +204,7 @@ typedef struct SQInfo {
* @param pQInfo
* @return
*/
int32_t
qCreateQueryInfo
(
void
*
pRead
Msg
,
SQInfo
**
pQInfo
);
int32_t
qCreateQueryInfo
(
SQueryTableMsg
*
pQueryTable
Msg
,
SQInfo
**
pQInfo
);
/**
* query on single table
...
...
src/query/src/queryExecutor.c
浏览文件 @
1d5bab8d
...
...
@@ -12,21 +12,22 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <qast.h>
#include "os.h"
#include "hash.h"
#include "hashfunc.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tlosertree.h"
#include "tstatus.h"
#include "tscompression.h"
#include "tstatus.h"
#include "ttime.h"
#include "tlog.h"
#include "qresultBuf.h"
#include "queryExecutor.h"
#include "queryUtil.h"
#include "tsdb.h"
#include "qresultBuf.h"
#define DEFAULT_INTERN_BUF_SIZE 16384L
...
...
@@ -36,7 +37,6 @@
*/
#define PRIMARY_TSCOL_LOADED(query) ((query)->colList[0].data.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
...
...
@@ -46,9 +46,9 @@
#define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, runtimeEnv))
#define GET_QINFO_ADDR(x) ((char
*)(x)-offsetof(SQInfo, runtimeEnv))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index)
*
(step))
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index)
*
(step))
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
...
...
@@ -56,7 +56,6 @@
#define GET_COLUMN_TYPE(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].pBase.colInfo.colIdxInBuf].data.type)
typedef
struct
SPointInterpoSupporter
{
int32_t
numOfCols
;
char
**
pPrevPoint
;
...
...
@@ -98,13 +97,10 @@ typedef enum {
QUERY_NO_DATA_TO_CHECK
=
0x8u
,
}
vnodeQueryStatus
;
static
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
);
bool
isIntervalQuery
(
SQuery
*
pQuery
)
{
return
pQuery
->
intervalTime
>
0
;
}
int32_t
setQueryCtxForTableQuery
(
void
*
pReadMsg
,
SQInfo
**
pQInfo
)
{
}
int32_t
setQueryCtxForTableQuery
(
void
*
pReadMsg
,
SQInfo
**
pQInfo
)
{}
enum
{
TS_JOIN_TS_EQUAL
=
0
,
...
...
@@ -112,7 +108,8 @@ enum {
TS_JOIN_TAG_NOT_EQUALS
=
2
,
};
static
int32_t
doMergeMetersResultsToGroupRes
(
SQInfo
*
pQInfo
,
STableDataInfo
*
pTableDataInfo
,
int32_t
start
,
int32_t
end
);
static
int32_t
doMergeMetersResultsToGroupRes
(
SQInfo
*
pQInfo
,
STableDataInfo
*
pTableDataInfo
,
int32_t
start
,
int32_t
end
);
static
void
setWindowResOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pResult
);
...
...
@@ -234,19 +231,19 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *
return
true
;
}
bool
vnodeDoFilterData
(
SQuery
*
pQuery
,
int32_t
elemPos
)
{
bool
vnodeDoFilterData
(
SQuery
*
pQuery
,
int32_t
elemPos
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfFilterCols
;
++
k
)
{
SSingleColumnFilterInfo
*
pFilterInfo
=
&
pQuery
->
pFilterInfo
[
k
];
char
*
pElem
=
pFilterInfo
->
pData
+
pFilterInfo
->
info
.
info
.
bytes
*
elemPos
;
char
*
pElem
=
pFilterInfo
->
pData
+
pFilterInfo
->
info
.
info
.
bytes
*
elemPos
;
if
(
isNull
(
pElem
,
pFilterInfo
->
info
.
info
.
type
))
{
if
(
isNull
(
pElem
,
pFilterInfo
->
info
.
info
.
type
))
{
return
false
;
}
int32_t
num
=
pFilterInfo
->
numOfFilters
;
bool
qualified
=
false
;
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
SColumnFilterElem
*
pFilterElem
=
&
pFilterInfo
->
pFilters
[
j
];
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
SColumnFilterElem
*
pFilterElem
=
&
pFilterInfo
->
pFilters
[
j
];
if
(
pFilterElem
->
fp
(
pFilterElem
,
pElem
,
pElem
))
{
qualified
=
true
;
break
;
...
...
@@ -261,7 +258,7 @@ bool vnodeDoFilterData(SQuery* pQuery, int32_t elemPos) {
return
true
;
}
bool
vnodeFilterData
(
SQuery
*
pQuery
,
int32_t
*
numOfActualRead
,
int32_t
index
)
{
bool
vnodeFilterData
(
SQuery
*
pQuery
,
int32_t
*
numOfActualRead
,
int32_t
index
)
{
(
*
numOfActualRead
)
++
;
if
(
!
vnodeDoFilterData
(
pQuery
,
index
))
{
return
false
;
...
...
@@ -401,9 +398,9 @@ bool doRevisedResultsByLimit(SQInfo *pQInfo) {
static
bool
queryPaused
(
SQuery
*
pQuery
,
SDataBlockInfo
*
pDataBlockInfo
,
int32_t
forwardStep
)
{
// output buffer is full, pause current query
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_RESBUF_FULL
))
{
// assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pDataBlockInfo->size) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0));
//
// assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pDataBlockInfo->size) ||
// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0));
//
return
true
;
}
...
...
@@ -429,7 +426,8 @@ static bool isTopBottomQuery(SQuery *pQuery) {
return
false
;
}
static
SDataStatis
*
getStatisInfo
(
SQuery
*
pQuery
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
int32_t
columnIndex
)
{
static
SDataStatis
*
getStatisInfo
(
SQuery
*
pQuery
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
int32_t
columnIndex
)
{
// no SField info exist, or column index larger than the output column, no result.
if
(
pStatis
==
NULL
)
{
return
NULL
;
...
...
@@ -696,7 +694,8 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
}
static
int32_t
getNumOfRowsInTimeWindow
(
SQuery
*
pQuery
,
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
pPrimaryColumn
,
int32_t
startPos
,
TSKEY
ekey
,
__block_search_fn_t
searchFn
,
bool
updateLastKey
)
{
int32_t
startPos
,
TSKEY
ekey
,
__block_search_fn_t
searchFn
,
bool
updateLastKey
)
{
assert
(
startPos
>=
0
&&
startPos
<
pDataBlockInfo
->
size
);
int32_t
num
=
-
1
;
...
...
@@ -743,7 +742,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
}
static
void
doBlockwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowStatus
*
pStatus
,
STimeWindow
*
pWin
,
int32_t
startPos
,
int32_t
forwardStep
,
TSKEY
*
tsBuf
)
{
int32_t
startPos
,
int32_t
forwardStep
,
TSKEY
*
tsBuf
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
...
...
@@ -869,10 +868,10 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
SColumnInfo
*
pColMsg
=
&
pQuery
->
colList
[
i
].
info
;
assert
(
0
);
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
sas
->
elemSize
[
i
]
=
pColMsg
->
bytes
;
// sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
// sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
}
sas
->
numOfCols
=
pQuery
->
numOfCols
;
...
...
@@ -930,7 +929,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataSt
primaryKeyCol
=
(
TSKEY
*
)(
pColInfo
->
pData
);
}
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
pDataBlockInfo
->
size
-
1
;
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
pDataBlockInfo
->
size
-
1
;
int64_t
prevNumOfRes
=
getNumOfResult
(
pRuntimeEnv
);
SArithmeticSupport
*
sasArray
=
calloc
((
size_t
)
pQuery
->
numOfOutputCols
,
sizeof
(
SArithmeticSupport
));
...
...
@@ -953,9 +952,10 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataSt
STimeWindow
win
=
getActiveTimeWindow
(
pWindowResInfo
,
ts
,
pQuery
);
assert
(
0
);
// if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win) != TSDB_CODE_SUCCESS) {
// return 0;
// }
// if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win) !=
// TSDB_CODE_SUCCESS) {
// return 0;
// }
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
int32_t
forwardStep
=
...
...
@@ -975,7 +975,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataSt
}
// null data, failed to allocate more memory buffer
// int32_t sid = pRuntimeEnv->pTabObj->sid;
// int32_t sid = pRuntimeEnv->pTabObj->sid;
int32_t
sid
=
0
;
assert
(
0
);
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
sid
,
&
nextWin
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1070,7 +1070,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, SData **data, int16_t *type, i
*
type
=
pQuery
->
colList
[
colIndex
].
info
.
type
;
*
bytes
=
pQuery
->
colList
[
colIndex
].
info
.
bytes
;
// groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf);
// groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf);
break
;
}
...
...
@@ -1131,13 +1131,14 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return
true
;
}
static
int32_t
rowwiseApplyAllFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
SWindowResInfo
*
pWindowResInfo
,
SArray
*
pDataBlock
)
{
static
int32_t
rowwiseApplyAllFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
SWindowResInfo
*
pWindowResInfo
,
SArray
*
pDataBlock
)
{
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
TSKEY
*
primaryKeyCol
=
(
TSKEY
*
)
taosArrayGet
(
pDataBlock
,
0
);
// SData **data = pRuntimeEnv->colDataBuffer;
// SData **data = pRuntimeEnv->colDataBuffer;
int64_t
prevNumOfRes
=
0
;
bool
groupbyStateValue
=
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
);
...
...
@@ -1154,19 +1155,19 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
char
*
groupbyColumnData
=
NULL
;
if
(
groupbyStateValue
)
{
assert
(
0
);
// groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes);
// groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes);
}
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutputCols
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pBase
.
functionId
;
SDataStatis
*
pColStatis
=
NULL
;
SDataStatis
*
pColStatis
=
NULL
;
bool
hasNull
=
hasNullValue
(
pQuery
,
k
,
pDataBlockInfo
,
pStatis
,
&
pColStatis
);
char
*
dataBlock
=
getDataBlocks
(
pRuntimeEnv
,
&
sasArray
[
k
],
k
,
pDataBlockInfo
->
size
,
pDataBlock
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
(
char
*
)
primaryKeyCol
,
pDataBlockInfo
->
size
,
functionId
,
pColStatis
,
hasNull
,
&
sasArray
[
k
],
pRuntimeEnv
->
scanFlag
);
setExecParams
(
pQuery
,
&
pCtx
[
k
],
dataBlock
,
(
char
*
)
primaryKeyCol
,
pDataBlockInfo
->
size
,
functionId
,
pColStatis
,
hasNull
,
&
sasArray
[
k
],
pRuntimeEnv
->
scanFlag
);
}
// set the input column data
...
...
@@ -1177,7 +1178,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
* NOTE: here the tbname/tags column cannot reach here, since it will never be a filter column,
* so we do NOT check if is a tag or not
*/
// pFilterInfo->pData = doGetDataBlocks(pQuery, data, pFilterInfo->info.colIdxInBuf);
// pFilterInfo->pData = doGetDataBlocks(pQuery, data, pFilterInfo->info.colIdxInBuf);
}
int32_t
numOfRes
=
0
;
...
...
@@ -1220,7 +1221,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
assert
(
0
);
int32_t
ret
=
0
;
// int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win);
// int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pTabObj->sid, &win);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
continue
;
}
...
...
@@ -1235,11 +1236,12 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
STimeWindow
nextWin
=
win
;
int32_t
index
=
pWindowResInfo
->
curIndex
;
assert
(
0
);
int32_t
sid
=
0
;
//
pRuntimeEnv->pTabObj->sid;
int32_t
sid
=
0
;
//
pRuntimeEnv->pTabObj->sid;
while
(
1
)
{
getNextTimeWindow
(
pQuery
,
&
nextWin
);
if
(
pWindowResInfo
->
startTime
>
nextWin
.
skey
||
(
nextWin
.
skey
>
pQuery
->
window
.
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
if
(
pWindowResInfo
->
startTime
>
nextWin
.
skey
||
(
nextWin
.
skey
>
pQuery
->
window
.
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
nextWin
.
skey
>
pQuery
->
window
.
skey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
break
;
}
...
...
@@ -1298,7 +1300,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
if
((
pQuery
->
checkBufferInLoop
==
1
)
&&
(
++
numOfRes
)
>=
pQuery
->
pointsOffset
)
{
pQuery
->
lastKey
=
lastKey
+
step
;
assert
(
0
);
// *forwardStep = j + 1;
// *forwardStep = j + 1;
break
;
}
}
...
...
@@ -1479,12 +1481,9 @@ static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool i
}
}
static
int32_t
setupQueryRuntimeEnv
(
void
*
pMeterObj
,
SQuery
*
pQuery
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
SColumnModel
*
pTagsSchema
,
int16_t
order
,
bool
isSTableQuery
)
{
dTrace
(
"QInfo:%p setup runtime env"
,
GET_QINFO_ADDR
(
pQuery
));
pRuntimeEnv
->
pTabObj
=
pMeterObj
;
pRuntimeEnv
->
pQuery
=
pQuery
;
static
int32_t
setupQueryRuntimeEnv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SColumnModel
*
pTagsSchema
,
int16_t
order
,
bool
isSTableQuery
)
{
dTrace
(
"QInfo:%p setup runtime env"
,
GET_QINFO_ADDR
(
pRuntimeEnv
));
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
pRuntimeEnv
->
resultInfo
=
calloc
(
pQuery
->
numOfOutputCols
,
sizeof
(
SResultInfo
));
pRuntimeEnv
->
pCtx
=
(
SQLFunctionCtx
*
)
calloc
(
pQuery
->
numOfOutputCols
,
sizeof
(
SQLFunctionCtx
));
...
...
@@ -1507,8 +1506,8 @@ static int32_t setupQueryRuntimeEnv(void *pMeterObj, SQuery *pQuery, SQueryRunti
pCtx
->
inputBytes
=
pSchema
->
bytes
;
}
else
{
assert
(
0
);
// pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
// pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
// pCtx->inputType = GET_COLUMN_TYPE(pQuery, i);
// pCtx->inputBytes = GET_COLUMN_BYTES(pQuery, i);
}
pCtx
->
ptsOutputBuf
=
NULL
;
...
...
@@ -1561,10 +1560,10 @@ static int32_t setupQueryRuntimeEnv(void *pMeterObj, SQuery *pQuery, SQueryRunti
setCtxTagColumnInfo
(
pQuery
,
pRuntimeEnv
->
pCtx
);
// for loading block data in memory
// assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock);
// assert(vnodeList[pMeterObj->vnode].cfg.rowsInFileBlock == pMeterObj->pointsPerFileBlock);
return
TSDB_CODE_SUCCESS
;
_error_clean:
_error_clean:
tfree
(
pRuntimeEnv
->
resultInfo
);
tfree
(
pRuntimeEnv
->
pCtx
);
...
...
@@ -1725,7 +1724,7 @@ bool needSupplementaryScan(SQuery *pQuery) {
/////////////////////////////////////////////////////////////////////////////////////////////
void
doGetAlignedIntervalQueryRangeImpl
(
SQuery
*
pQuery
,
int64_t
key
,
int64_t
keyFirst
,
int64_t
keyLast
,
int64_t
*
realSkey
,
int64_t
*
realEkey
,
STimeWindow
*
win
)
{
int64_t
*
realSkey
,
int64_t
*
realEkey
,
STimeWindow
*
win
)
{
assert
(
key
>=
keyFirst
&&
key
<=
keyLast
&&
pQuery
->
slidingTime
<=
pQuery
->
intervalTime
);
win
->
skey
=
taosGetIntervalStartTimestamp
(
key
,
pQuery
->
slidingTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
...
...
@@ -1782,8 +1781,8 @@ static bool doGetQueryPos(TSKEY key, SQInfo *pQInfo, SPointInterpoSupporter *pPo
#endif
}
static
bool
doSetDataInfo
(
SQInfo
*
pQInfo
,
SPointInterpoSupporter
*
pPointInterpSupporter
,
void
*
pMeterObj
,
TSKEY
nextKey
)
{
static
bool
doSetDataInfo
(
SQInfo
*
pQInfo
,
SPointInterpoSupporter
*
pPointInterpSupporter
,
void
*
pMeterObj
,
TSKEY
nextKey
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -1793,8 +1792,8 @@ static bool doSetDataInfo(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSu
* the query range is existed, so set them both the value of nextKey
*/
if
(
pQuery
->
window
.
skey
!=
pQuery
->
window
.
ekey
)
{
assert
(
pQuery
->
window
.
skey
>=
pQuery
->
window
.
ekey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
nextKey
>=
pQuery
->
window
.
ekey
&&
nextKey
<=
pQuery
->
window
.
skey
);
assert
(
pQuery
->
window
.
skey
>=
pQuery
->
window
.
ekey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
nextKey
>=
pQuery
->
window
.
ekey
&&
nextKey
<=
pQuery
->
window
.
skey
);
pQuery
->
window
.
skey
=
nextKey
;
pQuery
->
window
.
ekey
=
nextKey
;
...
...
@@ -1896,7 +1895,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
}
assert
(
0
);
// pQuery->pointsOffset = pQuery->pointsToRead;
// pQuery->pointsOffset = pQuery->pointsToRead;
}
/*
...
...
@@ -1960,8 +1959,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
if
(
isPointInterpoQuery
(
pQuery
)
&&
pQuery
->
intervalTime
==
0
)
{
if
(
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"interp"
,
pQuery
->
order
.
order
,
TSQL_SO_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"interp"
,
pQuery
->
order
.
order
,
TSQL_SO_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
}
...
...
@@ -1972,8 +1971,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
if
(
pQuery
->
intervalTime
==
0
)
{
if
(
onlyFirstQuery
(
pQuery
))
{
if
(
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"only-first"
,
pQuery
->
order
.
order
,
TSQL_SO_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"only-first"
,
pQuery
->
order
.
order
,
TSQL_SO_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
}
...
...
@@ -1981,8 +1980,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
pQuery
->
order
.
order
=
TSQL_SO_ASC
;
}
else
if
(
onlyLastQuery
(
pQuery
))
{
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"only-last"
,
pQuery
->
order
.
order
,
TSQL_SO_DESC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"only-last"
,
pQuery
->
order
.
order
,
TSQL_SO_DESC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
}
...
...
@@ -1994,8 +1993,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
if
(
metricQuery
)
{
if
(
onlyFirstQuery
(
pQuery
))
{
if
(
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"only-first stable"
,
pQuery
->
order
.
order
,
TSQL_SO_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"only-first stable"
,
pQuery
->
order
.
order
,
TSQL_SO_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
}
...
...
@@ -2003,8 +2002,8 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool metricQuery) {
pQuery
->
order
.
order
=
TSQL_SO_ASC
;
}
else
if
(
onlyLastQuery
(
pQuery
))
{
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"only-last stable"
,
pQuery
->
order
.
order
,
TSQL_SO_DESC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
dTrace
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"only-last stable"
,
pQuery
->
order
.
order
,
TSQL_SO_DESC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
}
...
...
@@ -2081,7 +2080,7 @@ static void doSetInterpVal(SQLFunctionCtx *pCtx, TSKEY ts, int16_t type, int32_t
*/
void
pointInterpSupporterSetData
(
SQInfo
*
pQInfo
,
SPointInterpoSupporter
*
pPointInterpSupport
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// not point interpolation query, abort
if
(
!
isPointInterpoQuery
(
pQuery
))
{
...
...
@@ -2154,7 +2153,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
pInterpInfo
->
pInterpDetail
=
calloc
(
1
,
sizeof
(
SInterpInfoDetail
));
SInterpInfoDetail
*
pInterpDetail
=
pInterpInfo
->
pInterpDetail
;
// int32_t type = GET_COLUMN_TYPE(pQuery, i);
// int32_t type = GET_COLUMN_TYPE(pQuery, i);
int32_t
type
=
0
;
assert
(
0
);
...
...
@@ -2188,7 +2187,7 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu
len
+=
pQuery
->
colList
[
i
].
info
.
bytes
;
}
// assert(PRIMARY_TSCOL_LOADED(pQuery));
// assert(PRIMARY_TSCOL_LOADED(pQuery));
void
*
prev
=
calloc
(
1
,
len
);
void
*
next
=
calloc
(
1
,
len
);
...
...
@@ -2296,7 +2295,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
pQInfo
->
pTableList
=
NULL
;
}
// tSidSetDestroy(&pQInfo->pSidSet);
// tSidSetDestroy(&pQInfo->pSidSet);
if
(
pQInfo
->
pTableDataInfo
!=
NULL
)
{
size_t
num
=
taosHashGetSize
(
pQInfo
->
pTableList
);
...
...
@@ -2311,19 +2310,19 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
int32_t
vnodeSTableQueryPrepare
(
SQInfo
*
pQInfo
,
SQuery
*
pQuery
,
void
*
param
)
{
if
((
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
))
||
(
!
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
ekey
>
pQuery
->
window
.
skey
)))
{
dTrace
(
"QInfo:%p no result in time range %"
PRId64
"-%"
PRId64
", order %d"
,
pQInfo
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
);
dTrace
(
"QInfo:%p no result in time range %"
PRId64
"-%"
PRId64
", order %d"
,
pQInfo
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
);
sem_post
(
&
pQInfo
->
dataReady
);
// pQInfo->over = 1;
// pQInfo->over = 1;
return
TSDB_CODE_SUCCESS
;
}
pQuery
->
status
=
0
;
pQInfo
->
rec
=
(
SResultRec
)
{
0
};
pQuery
->
rec
=
(
SResultRec
)
{
0
};
pQInfo
->
rec
=
(
SResultRec
){
0
};
pQuery
->
rec
=
(
SResultRec
){
0
};
changeExecuteScanOrder
(
pQuery
,
true
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
...
...
@@ -2342,7 +2341,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
// get one queried meter
assert
(
0
);
// SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[0]->sid);
// SMeterObj *pMeter = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[0]->sid);
pRuntimeEnv
->
pTSBuf
=
param
;
pRuntimeEnv
->
cur
.
vnodeIndex
=
-
1
;
...
...
@@ -2354,12 +2353,12 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
}
assert
(
0
);
// int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSQL_SO_ASC, true);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSQL_SO_ASC, true);
// if (ret != TSDB_CODE_SUCCESS) {
// return ret;
// }
// tSidSetSort(pQInfo->pSidSet);
// tSidSetSort(pQInfo->pSidSet);
int32_t
size
=
getInitialPageNum
(
pQInfo
);
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pRuntimeEnv
->
pResultBuf
,
size
,
pQuery
->
rowSize
);
...
...
@@ -2388,10 +2387,10 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
cond
.
colList
=
*
pQuery
->
colList
;
SArray
*
sa
=
taosArrayInit
(
1
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
pQInfo
->
pSidSet
->
numOfSid
s
;
++
i
)
{
// SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid);
// taosArrayPush(sa, &p1);
}
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTable
s; ++i) {
// SMeterObj *p1 = getMeterObj(pQInfo->pTableList, pQInfo->pSidSet->pSids[i]->sid);
// taosArrayPush(sa, &p1);
//
}
SArray
*
cols
=
taosArrayInit
(
pQuery
->
numOfCols
,
sizeof
(
pQuery
->
colList
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
...
...
@@ -2405,8 +2404,8 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery
->
interpoType
=
TSDB_INTERPO_NONE
;
}
TSKEY
revisedStime
=
taosGetIntervalStartTimestamp
(
pQuery
->
window
.
skey
,
pQuery
->
intervalTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
TSKEY
revisedStime
=
taosGetIntervalStartTimestamp
(
pQuery
->
window
.
skey
,
pQuery
->
intervalTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
taosInitInterpoInfo
(
&
pRuntimeEnv
->
interpoInfo
,
pQuery
->
order
.
order
,
revisedStime
,
0
,
0
);
pRuntimeEnv
->
stableQuery
=
true
;
...
...
@@ -2532,14 +2531,14 @@ static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow) {
pTimeWindow
->
ekey
=
pTimeWindow
->
skey
+
(
pQuery
->
intervalTime
-
1
);
}
SArray
*
loadDataBlockOnDemand
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
**
pStatis
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SArray
*
loadDataBlockOnDemand
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
**
pStatis
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
tsdb_query_handle_t
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
uint32_t
r
=
0
;
SArray
*
pDataBlock
=
NULL
;
// STimeWindow *w = &pQueryHandle->window;
// STimeWindow *w = &pQueryHandle->window;
if
(
pQuery
->
numOfFilterCols
>
0
)
{
r
=
BLK_DATA_ALL_NEEDED
;
...
...
@@ -2548,7 +2547,7 @@ SArray* loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBl
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pBase
.
functionId
;
int32_t
colId
=
pQuery
->
pSelectExpr
[
i
].
pBase
.
colInfo
.
colId
;
// r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], w->skey, w->ekey, colId);
// r |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], w->skey, w->ekey, colId);
}
if
(
pRuntimeEnv
->
pTSBuf
>
0
||
isIntervalQuery
(
pQuery
))
{
...
...
@@ -2684,8 +2683,7 @@ static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTable
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static
void
doSetTagValueInParam
(
SColumnModel
*
pTagSchema
,
int32_t
tagColIdx
,
void
*
pMeterSidInfo
,
tVariant
*
param
)
{
static
void
doSetTagValueInParam
(
SColumnModel
*
pTagSchema
,
int32_t
tagColIdx
,
void
*
pMeterSidInfo
,
tVariant
*
param
)
{
assert
(
tagColIdx
>=
0
);
#if 0
int16_t offset = getColumnModelOffset(pTagSchema, tagColIdx);
...
...
@@ -2886,9 +2884,9 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
}
typedef
struct
SCompSupporter
{
STableDataInfo
**
pTableDataInfo
;
STableDataInfo
**
pTableDataInfo
;
int32_t
*
position
;
SQInfo
*
pQInfo
;
SQInfo
*
pQInfo
;
}
SCompSupporter
;
int32_t
tableResultComparFn
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
)
{
...
...
@@ -2942,7 +2940,7 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) {
int32_t
end
=
pQInfo
->
pSidSet
->
starterPos
[
pQInfo
->
subgroupIdx
+
1
];
assert
(
0
);
// ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
// ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
if
(
ret
<
0
)
{
// not enough disk space to save the data into disk
return
-
1
;
}
...
...
@@ -2975,7 +2973,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
// set current query completed
if
(
pQInfo
->
numOfGroupResultPages
==
0
&&
pQInfo
->
subgroupIdx
==
pQInfo
->
pSidSet
->
numOfSubSet
)
{
pQInfo
->
tableIndex
=
pQInfo
->
pSidSet
->
numOfSid
s
;
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTable
s;
return
;
}
}
...
...
@@ -3040,8 +3038,8 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
}
int32_t
doMergeMetersResultsToGroupRes
(
SQInfo
*
pQInfo
,
STableDataInfo
*
pTableDataInfo
,
int32_t
start
,
int32_t
end
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
tFilePage
**
buffer
=
(
tFilePage
**
)
pQuery
->
sdata
;
int32_t
*
posList
=
calloc
((
end
-
start
),
sizeof
(
int32_t
));
...
...
@@ -3107,13 +3105,13 @@ int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDat
doMerge
(
pRuntimeEnv
,
ts
,
pWindowRes
,
true
);
}
else
{
// copy data to disk buffer
assert
(
0
);
// if (buffer[0]->numOfElems == pQuery->pointsToRead) {
// if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
// return -1;
// }
// if (buffer[0]->numOfElems == pQuery->pointsToRead) {
// if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
// return -1;
// }
// resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
// }
// resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
// }
doMerge
(
pRuntimeEnv
,
ts
,
pWindowRes
,
false
);
buffer
[
0
]
->
numOfElems
+=
1
;
...
...
@@ -3169,8 +3167,8 @@ int32_t doMergeMetersResultsToGroupRes(SQInfo *pQInfo, STableDataInfo *pTableDat
}
int32_t
flushFromResultBuf
(
SQInfo
*
pQInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SDiskbasedResultBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
int32_t
capacity
=
(
DEFAULT_INTERN_BUF_SIZE
-
sizeof
(
tFilePage
))
/
pQuery
->
rowSize
;
...
...
@@ -3322,7 +3320,7 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
void
resetCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// int32_t rows = pRuntimeEnv->pTabObj->pointsPerFileBlock;
// int32_t rows = pRuntimeEnv->pTabObj->pointsPerFileBlock;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutputCols
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
...
...
@@ -3342,7 +3340,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
}
assert
(
0
);
// memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * rows);
// memset(pQuery->sdata[i]->data, 0, (size_t)pQuery->pSelectExpr[i].resBytes * rows);
}
initCtxOutputBuf
(
pRuntimeEnv
);
...
...
@@ -3397,7 +3395,7 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery
->
limit
.
offset
-=
pQuery
->
rec
.
pointsRead
;
pQuery
->
rec
.
pointsRead
=
0
;
// pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer
// pQuery->pointsOffset = pQuery->rec.pointsToRead; // clear all data in result buffer
resetCtxOutputBuf
(
pRuntimeEnv
);
...
...
@@ -3411,7 +3409,7 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pBase
.
functionId
;
int32_t
bytes
=
pRuntimeEnv
->
pCtx
[
i
].
outputBytes
;
assert
(
0
);
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes);
// memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->pointsRead * bytes);
pRuntimeEnv
->
pCtx
[
i
].
aOutputBuf
+=
bytes
*
numOfSkip
;
if
(
functionId
==
TSDB_FUNC_DIFF
||
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
)
{
...
...
@@ -3597,7 +3595,8 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
doSingleMeterSupplementScan
(
pRuntimeEnv
);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during supplementary scan
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during
// supplementary scan
pQuery
->
lastKey
=
lkey
;
pQuery
->
window
.
ekey
=
ekey
;
...
...
@@ -3659,7 +3658,10 @@ STableQueryInfo *createMeterQueryInfo(SQInfo *pQInfo, int32_t sid, TSKEY skey, T
STableQueryInfo
*
pTableQueryInfo
=
calloc
(
1
,
sizeof
(
STableQueryInfo
));
pTableQueryInfo
->
win
=
(
STimeWindow
)
{.
skey
=
skey
,
.
ekey
=
ekey
,};
pTableQueryInfo
->
win
=
(
STimeWindow
){
.
skey
=
skey
,
.
ekey
=
ekey
,
};
pTableQueryInfo
->
lastKey
=
skey
;
pTableQueryInfo
->
sid
=
sid
;
...
...
@@ -3716,8 +3718,8 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
* @param pRuntimeEnv
* @param pDataBlockInfoEx
*/
void
setExecutionContext
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableQueryInfo
,
int32_t
meterIdx
,
int32_t
groupIdx
,
TSKEY
nextKey
)
{
void
setExecutionContext
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableQueryInfo
,
int32_t
meterIdx
,
int32_t
groupIdx
,
TSKEY
nextKey
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
int32_t
GROUPRESULTID
=
1
;
...
...
@@ -3774,7 +3776,7 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, int32_t meterIdx, STableQueryInfo *pTa
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
assert
(
pTableQueryInfo
->
lastKey
>
0
);
// vnodeSetTagValueInParam(pQInfo->pSidSet, pRuntimeEnv, pQInfo->pMeterSidExtInfo[meterIdx]);
// vnodeSetTagValueInParam(pQInfo->pSidSet, pRuntimeEnv, pQInfo->pMeterSidExtInfo[meterIdx]);
// both the master and supplement scan needs to set the correct ts comp start position
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
...
...
@@ -3924,13 +3926,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
* current output space is not enough to keep all the result data of this group, only copy partial results
* to SQuery object's result buffer
*/
// if (numOfRowsToCopy > pQuery->pointsToRead - numOfResult) {
// numOfRowsToCopy = pQuery->pointsToRead - numOfResult;
// pQInfo->offset += numOfRowsToCopy;
// } else {
// pQInfo->offset = 0;
// pQInfo->subgroupIdx += 1;
// }
// if (numOfRowsToCopy > pQuery->pointsToRead - numOfResult) {
// numOfRowsToCopy = pQuery->pointsToRead - numOfResult;
// pQInfo->offset += numOfRowsToCopy;
// } else {
// pQInfo->offset = 0;
// pQInfo->subgroupIdx += 1;
// }
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
int32_t
size
=
pRuntimeEnv
->
pCtx
[
j
].
outputBytes
;
...
...
@@ -3942,9 +3944,9 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
numOfResult
+=
numOfRowsToCopy
;
assert
(
0
);
// if (numOfResult == pQuery->rec.pointsToRead) {
// break;
// }
// if (numOfResult == pQuery->rec.pointsToRead) {
// break;
// }
}
dTrace
(
"QInfo:%p copy data to SQuery buf completed"
,
GET_QINFO_ADDR
(
pQuery
));
...
...
@@ -3971,7 +3973,7 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
int32_t
numOfResult
=
doCopyToSData
(
pQInfo
,
result
,
orderType
);
pQuery
->
rec
.
pointsRead
+=
numOfResult
;
// assert(pQuery->rec.pointsRead <= pQuery->pointsToRead);
// assert(pQuery->rec.pointsRead <= pQuery->pointsToRead);
}
static
void
updateWindowResNumOfRes
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STableDataInfo
*
pTableDataInfo
)
{
...
...
@@ -3989,9 +3991,8 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableDataInf
}
}
void
stableApplyFunctionsOnBlock_
(
SQInfo
*
pQInfo
,
STableDataInfo
*
pTableDataInfo
,
SDataBlockInfo
*
pDataBlockInfo
,
SDataStatis
*
pStatis
,
SArray
*
pDataBlock
,
__block_search_fn_t
searchFn
)
{
void
stableApplyFunctionsOnBlock_
(
SQInfo
*
pQInfo
,
STableDataInfo
*
pTableDataInfo
,
SDataBlockInfo
*
pDataBlockInfo
,
SDataStatis
*
pStatis
,
SArray
*
pDataBlock
,
__block_search_fn_t
searchFn
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pTableDataInfo
->
pTableQInfo
;
...
...
@@ -3999,7 +4000,7 @@ void stableApplyFunctionsOnBlock_(SQInfo *pQInfo, STableDataInfo *pTableDataInfo
int32_t
numOfRes
=
0
;
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
// numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pDataBlockInfo, pWindowResInfo);
// numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pDataBlockInfo, pWindowResInfo);
}
else
{
numOfRes
=
blockwiseApplyAllFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pWindowResInfo
,
searchFn
,
pDataBlock
);
}
...
...
@@ -4067,9 +4068,11 @@ bool vnodeHasRemainResults(void *handle) {
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
|
QUERY_NO_DATA_TO_CHECK
))
{
TSKEY
ekey
=
taosGetRevisedEndKey
(
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
,
pQuery
->
intervalTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
// int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pRuntimeEnv->pInterpoBuf[0]->data,
// remain, pQuery->intervalTime, ekey, pQuery->pointsToRead);
// return numOfTotal > 0;
// int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY
// *)pRuntimeEnv->pInterpoBuf[0]->data,
// remain, pQuery->intervalTime, ekey,
// pQuery->pointsToRead);
// return numOfTotal > 0;
assert
(
0
);
return
false
;
}
...
...
@@ -4214,7 +4217,6 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
}
}
#endif
}
void
vnodePrintQueryStatistics
(
SQInfo
*
pQInfo
)
{
...
...
@@ -4262,49 +4264,47 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) {
#endif
}
int32_t
vnodeQueryTablePrepare
(
SQInfo
*
pQInfo
,
void
*
p
MeterObj
,
void
*
p
aram
)
{
int32_t
vnodeQueryTablePrepare
(
SQInfo
*
pQInfo
,
void
*
param
)
{
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
//only the successful complete requries the sem_post/over = 1 operations.
//
only the successful complete requries the sem_post/over = 1 operations.
if
((
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
))
||
(
!
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
ekey
>
pQuery
->
window
.
skey
)))
{
dTrace
(
"QInfo:%p no result in time range %"
PRId64
"-%"
PRId64
", order %d"
,
pQInfo
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
);
dTrace
(
"QInfo:%p no result in time range %"
PRId64
"-%"
PRId64
", order %d"
,
pQInfo
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
);
sem_post
(
&
pQInfo
->
dataReady
);
// pQInfo->over = 1;
// pQInfo->over = 1;
return
TSDB_CODE_SUCCESS
;
}
setScanLimitationByResultBuffer
(
pQuery
);
changeExecuteScanOrder
(
pQuery
,
false
);
// pQInfo->over = 0;
pQInfo
->
rec
=
(
SResultRec
)
{
0
};
// pQuery->pointsRead = 0;
pQInfo
->
rec
=
(
SResultRec
){
0
};
// dataInCache requires lastKey value
pQuery
->
lastKey
=
pQuery
->
window
.
skey
;
STsdbQueryCond
cond
=
{
0
};
cond
.
twindow
=
(
STimeWindow
){.
skey
=
pQuery
->
window
.
skey
,
.
ekey
=
pQuery
->
window
.
ekey
};
cond
.
order
=
pQuery
->
order
.
order
;
cond
.
twindow
=
(
STimeWindow
)
{.
skey
=
pQuery
->
window
.
skey
,
.
ekey
=
pQuery
->
window
.
ekey
};
cond
.
order
=
pQuery
->
order
.
order
;
cond
.
colList
=
*
pQuery
->
colList
;
SArray
*
sa
=
taosArrayInit
(
1
,
POINTER_BYTES
);
taosArrayPush
(
sa
,
&
pMeterObj
);
SArray
*
cols
=
taosArrayInit
(
pQuery
->
numOfCols
,
sizeof
(
pQuery
->
colList
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
taosArrayPush
(
cols
,
&
pQuery
->
colList
[
i
]);
}
SArray
*
sa
=
taosArrayInit
(
1
,
sizeof
(
int16_t
));
taosArrayPush
(
sa
,
&
pQInfo
->
pSidSet
->
pSids
[
0
]
->
sid
);
pQInfo
->
runtimeEnv
.
pQueryHandle
=
tsdbQueryByTableId
(
&
cond
,
sa
,
cols
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
pRuntimeEnv
->
pQuery
=
pQuery
;
pRuntimeEnv
->
pTabObj
=
pMeterObj
;
pRuntimeEnv
->
pTSBuf
=
param
;
pRuntimeEnv
->
cur
.
vnodeIndex
=
-
1
;
...
...
@@ -4314,7 +4314,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
}
// create runtime environment
code
=
setupQueryRuntimeEnv
(
pMeterObj
,
pQuery
,
&
pQInfo
->
runtimeEnv
,
NULL
,
pQuery
->
order
.
order
,
false
);
code
=
setupQueryRuntimeEnv
(
&
pQInfo
->
runtimeEnv
,
NULL
,
pQuery
->
order
.
order
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -4338,7 +4338,6 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
initWindowResInfo
(
&
pRuntimeEnv
->
windowResInfo
,
pRuntimeEnv
,
rows
,
4096
,
type
);
}
/* query on single table */
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
SPointInterpoSupporter
interpInfo
=
{
0
};
...
...
@@ -4371,7 +4370,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
int64_t
rs
=
taosGetIntervalStartTimestamp
(
pQuery
->
window
.
skey
,
pQuery
->
intervalTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
taosInitInterpoInfo
(
&
pRuntimeEnv
->
interpoInfo
,
pQuery
->
order
.
order
,
rs
,
0
,
0
);
// allocMemForInterpo(pQInfo, pQuery, pMeterObj);
// allocMemForInterpo(pQInfo, pQuery, pMeterObj);
if
(
!
isPointInterpoQuery
(
pQuery
))
{
// assert(pQuery->pos >= 0 && pQuery->slot >= 0);
...
...
@@ -4384,7 +4383,6 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *pMeterObj, void *param) {
return
TSDB_CODE_SUCCESS
;
}
static
bool
isGroupbyEachTable
(
SSqlGroupbyExpr
*
pGroupbyExpr
,
tSidSet
*
pSidset
)
{
if
(
pGroupbyExpr
==
NULL
||
pGroupbyExpr
->
numOfGroupCols
==
0
)
{
return
false
;
...
...
@@ -4393,7 +4391,7 @@ static bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, tSidSet *pSidset)
for
(
int32_t
i
=
0
;
i
<
pGroupbyExpr
->
numOfGroupCols
;
++
i
)
{
SColIndexEx
*
pColIndex
=
&
pGroupbyExpr
->
columnInfo
[
i
];
if
(
pColIndex
->
flag
==
TSDB_COL_TAG
)
{
assert
(
pSidset
->
numOfSid
s
==
pSidset
->
numOfSubSet
);
// assert(pSidset->numOfTable
s == pSidset->numOfSubSet);
return
true
;
}
}
...
...
@@ -4410,8 +4408,6 @@ static bool doCheckWithPrevQueryRange(SQuery *pQuery, TSKEY nextKey) {
return
true
;
}
static
void
enableExecutionForNextTable
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -4424,13 +4420,13 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
}
static
void
queryOnDataBlocks
(
SQInfo
*
pQInfo
,
STableDataInfo
*
pMeterDataInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// SMeterObj * pTempMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pMeterSidExtInfo[0]->sid);
// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm];
// SMeterObj * pTempMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pMeterSidExtInfo[0]->sid);
// __block_search_fn_t searchFn = vnodeSearchKeyFunc[pTempMeter->searchAlgorithm];
// dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles);
// dTrace("QInfo:%p start to check data blocks in %d files", pQInfo, pVnodeFileInfo->numOfFiles);
tsdb_query_handle_t
*
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
...
...
@@ -4441,52 +4437,51 @@ static void queryOnDataBlocks(SQInfo *pQInfo, STableDataInfo *pMeterDataInfo) {
// prepare the STableDataInfo struct for each table
SDataBlockInfo
blockInfo
=
tsdbRetrieveDataBlockInfo
(
pQueryHandle
);
// SMeterObj * pMeterObj = getMeterObj(pSupporter->pMetersHashTable, blockInfo.sid);
// SMeterObj * pMeterObj = getMeterObj(pSupporter->pMetersHashTable, blockInfo.sid);
// pQInfo->pObj = pMeterObj;
// pRuntimeEnv->pMeterObj = pMeterObj;
// pQInfo->pObj = pMeterObj;
// pRuntimeEnv->pMeterObj = pMeterObj;
STableDataInfo
*
pTableDataInfo
=
NULL
;
// for (int32_t i = 0; i < pSupporter->pSidSet->numOfSid
s; ++i) {
// if (pMeterDataInfo[i].pMeterObj == pMeterObj) {
// pTableDataInfo = &pMeterDataInfo[i];
// break;
// }
// }
// for (int32_t i = 0; i < pSupporter->pSidSet->numOfTable
s; ++i) {
// if (pMeterDataInfo[i].pMeterObj == pMeterObj) {
// pTableDataInfo = &pMeterDataInfo[i];
// break;
// }
// }
assert
(
pTableDataInfo
!=
NULL
);
STableQueryInfo
*
pTableQueryInfo
=
pTableDataInfo
->
pTableQInfo
;
if
(
pTableDataInfo
->
pTableQInfo
==
NULL
)
{
// pTableDataInfo->pTableQInfo = createMeterQueryInfo(pQInfo, pMeterObj->sid, pQuery->skey, pQuery->ekey);
// pTableDataInfo->pTableQInfo = createMeterQueryInfo(pQInfo, pMeterObj->sid, pQuery->skey, pQuery->ekey);
}
restoreIntervalQueryRange
(
pRuntimeEnv
,
pTableQueryInfo
);
SDataStatis
*
pStatis
=
NULL
;
SArray
*
pDataBlock
=
loadDataBlockOnDemand
(
pRuntimeEnv
,
&
blockInfo
,
&
pStatis
);
SArray
*
pDataBlock
=
loadDataBlockOnDemand
(
pRuntimeEnv
,
&
blockInfo
,
&
pStatis
);
TSKEY
nextKey
=
blockInfo
.
window
.
ekey
;
if
(
pQuery
->
intervalTime
==
0
)
{
setExecutionContext
(
pQInfo
,
pTableQueryInfo
,
pTableDataInfo
->
tableIndex
,
pTableDataInfo
->
groupIdx
,
nextKey
);
setExecutionContext
(
pQInfo
,
pTableQueryInfo
,
pTableDataInfo
->
tableIndex
,
pTableDataInfo
->
groupIdx
,
nextKey
);
}
else
{
// interval query
setIntervalQueryRange
(
pTableQueryInfo
,
pQInfo
,
nextKey
);
int32_t
ret
=
setAdditionalInfo
(
pQInfo
,
pTableDataInfo
->
tableIndex
,
pTableQueryInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// pQInfo->killed = 1;
// pQInfo->killed = 1;
return
;
}
}
// stableApplyFunctionsOnBlock_(pSupporter, pTableDataInfo, &blockInfo, pStatis, pDataBlock, searchFn);
// stableApplyFunctionsOnBlock_(pSupporter, pTableDataInfo, &blockInfo, pStatis, pDataBlock, searchFn);
}
}
static
bool
multimeterMultioutputHelper
(
SQInfo
*
pQInfo
,
bool
*
dataInDisk
,
bool
*
dataInCache
,
int32_t
index
,
int32_t
start
)
{
// SMeterSidExt
Info **pMeterSidExtInfo = pQInfo->pMeterSidExtInfo;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
// STableId
Info **pMeterSidExtInfo = pQInfo->pMeterSidExtInfo;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
&
pRuntimeEnv
->
pQuery
;
#if 0
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
...
...
@@ -4540,7 +4535,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
static
int64_t
doCheckMetersInGroup
(
SQInfo
*
pQInfo
,
int32_t
index
,
int32_t
start
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
bool
dataInDisk
=
true
;
bool
dataInCache
=
true
;
...
...
@@ -4548,15 +4543,14 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
return
0
;
}
SPointInterpoSupporter
pointInterpSupporter
=
{
0
};
pointInterpSupporterInit
(
pQuery
,
&
pointInterpSupporter
);
assert
(
0
);
// if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL)) {
// pointInterpSupporterDestroy(&pointInterpSupporter);
// return 0;
// }
// if (!normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &pointInterpSupporter, NULL)) {
// pointInterpSupporterDestroy(&pointInterpSupporter);
// return 0;
// }
/*
* here we set the value for before and after the specified time into the
...
...
@@ -4590,7 +4584,7 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
* @param pQInfo
*/
static
void
vnodeSTableSeqProcessor
(
SQInfo
*
pQInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
#if 0
SQuery* pQuery = pRuntimeEnv->pQuery;
...
...
@@ -4686,7 +4680,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
}
}
if (pSupporter->meterIdx >= pSids->numOf
Sid
s) {
if (pSupporter->meterIdx >= pSids->numOf
Table
s) {
return;
}
...
...
@@ -4752,7 +4746,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
// the limitation of output result is reached, set the query completed
if (doRevisedResultsByLimit(pQInfo)) {
pSupporter->meterIdx = pSupporter->pSidSet->numOf
Sid
s;
pSupporter->meterIdx = pSupporter->pSidSet->numOf
Table
s;
break;
}
...
...
@@ -4839,7 +4833,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
dTrace(
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%" PRId64 ", offset:%" PRId64,
pQInfo, vid, pSids->numOf
Sid
s, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo, vid, pSids->numOf
Table
s, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset);
#endif
}
...
...
@@ -4848,15 +4842,15 @@ static void doOrderedScan(SQInfo *pQInfo) {
SQuery
*
pQuery
=
&
pQInfo
->
runtimeEnv
.
pQuery
;
#if 0
// if (pQInfo->runtimeEnv. == NULL) {
// pSupporter->pMeterDataInfo = calloc(pSupporter->pSidSet->numOf
Sid
s, sizeof(STableDataInfo));
// pSupporter->pMeterDataInfo = calloc(pSupporter->pSidSet->numOf
Table
s, sizeof(STableDataInfo));
// }
S
MeterSidExt
Info **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
S
TableId
Info **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo;
tSidSet* pSidset = pSupporter->pSidSet;
int32_t groupId = 0;
for (int32_t i = 0; i < pSidset->numOf
Sid
s; ++i) { // load all meter meta info
for (int32_t i = 0; i < pSidset->numOf
Table
s; ++i) { // load all meter meta info
SMeterObj *pMeterObj = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[i]->sid);
if (pMeterObj == NULL) {
dError("QInfo:%p failed to find required sid:%d", pQInfo, pMeterSidExtInfo[i]->sid);
...
...
@@ -4886,14 +4880,14 @@ static void setupMeterQueryInfoForSupplementQuery(SQInfo *pQInfo) {
int32_t
num
=
taosHashGetSize
(
pQInfo
->
pTableList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
// STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
// STableQueryInfo *pTableQueryInfo = pSupporter->pMeterDataInfo[i].pTableQInfo;
// changeMeterQueryInfoForSuppleQuery(pQuery, pTableQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
}
}
static
void
doMultiMeterSupplementaryScan
(
SQInfo
*
pQInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
!
needSupplementaryScan
(
pQuery
))
{
dTrace
(
"QInfo:%p no need to do supplementary scan, query completed"
,
pQInfo
);
...
...
@@ -4901,7 +4895,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
}
SET_SUPPLEMENT_SCAN_FLAG
(
pRuntimeEnv
);
// disableFunctForSuppleScan(pSupporter, pQuery->order.order);
// disableFunctForSuppleScan(pSupporter, pQuery->order.order);
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
pRuntimeEnv
->
pTSBuf
->
cur
.
order
=
pRuntimeEnv
->
pTSBuf
->
cur
.
order
^
1u
;
...
...
@@ -4933,7 +4927,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
}
static
void
vnodeMultiMeterQueryProcessor
(
SQInfo
*
pQInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
pQInfo
->
subgroupIdx
>
0
)
{
...
...
@@ -4954,7 +4948,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
pQInfo
->
rec
.
pointsRead
+=
pQuery
->
rec
.
pointsRead
;
if
(
pQuery
->
rec
.
pointsRead
==
0
)
{
// vnodePrintQueryStatistics(pSupporter);
// vnodePrintQueryStatistics(pSupporter);
}
dTrace
(
"QInfo:%p points returned:%d, totalRead:%d totalReturn:%d"
,
pQInfo
,
pQuery
->
rec
.
pointsRead
,
...
...
@@ -5146,7 +5140,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
}
// load the data block for the next retrieve
// loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
// loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_RESBUF_FULL
))
{
break
;
}
...
...
@@ -5155,10 +5149,10 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
/* handle time interval query on single table */
static
void
vnodeSingleTableIntervalProcessor
(
SQInfo
*
pQInfo
)
{
// STable *pMeterObj = pQInfo->pObj;
// STable *pMeterObj = pQInfo->pObj;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
(
pQInfo
->
runtimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
(
pQInfo
->
runtimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
numOfInterpo
=
0
;
...
...
@@ -5187,8 +5181,8 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
}
numOfInterpo
=
0
;
pQuery
->
rec
.
pointsRead
=
vnodeQueryResultInterpolate
(
pQInfo
,
(
tFilePage
**
)
pQuery
->
sdata
,
(
tFilePage
**
)
pInterpoBuf
,
pQuery
->
rec
.
pointsRead
,
&
numOfInterpo
);
pQuery
->
rec
.
pointsRead
=
vnodeQueryResultInterpolate
(
pQInfo
,
(
tFilePage
**
)
pQuery
->
sdata
,
(
tFilePage
**
)
pInterpoBuf
,
pQuery
->
rec
.
pointsRead
,
&
numOfInterpo
);
dTrace
(
"QInfo: %p interpo completed, final:%d"
,
pQInfo
,
pQuery
->
rec
.
pointsRead
);
if
(
pQuery
->
rec
.
pointsRead
>
0
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
|
QUERY_NO_DATA_TO_CHECK
))
{
...
...
@@ -5212,13 +5206,14 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
pQInfo
->
rec
.
pointsRead
+=
pQuery
->
rec
.
pointsRead
;
pQInfo
->
pointsInterpo
+=
numOfInterpo
;
// dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
// dTrace("%p vid:%d sid:%d id:%s, %d points returned %d points interpo, totalRead:%d totalInterpo:%d
// totalReturn:%d",
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
}
void
qTableQuery
(
void
*
pReadMsg
)
{
// SQInfo *pQInfo = (SQInfo *)pReadMsg->ahandle;
void
qTableQuery
(
void
*
pReadMsg
)
{
// SQInfo *pQInfo = (SQInfo *)pReadMsg->ahandle;
#if 0
if (pQInfo == NULL) {
...
...
@@ -5395,3 +5390,740 @@ void qSuperTableQuery(void *pReadMsg) {
// vnodeDecRefCount(pQInfo);
#endif
}
static
int32_t
getColumnIndexInSource
(
SQueryTableMsg
*
pQueryTableMsg
,
SSqlFuncExprMsg
*
pExprMsg
)
{
int32_t
j
=
0
;
while
(
j
<
pQueryTableMsg
->
numOfCols
)
{
if
(
pExprMsg
->
colInfo
.
colId
==
pQueryTableMsg
->
colList
[
j
].
colId
)
{
break
;
}
j
+=
1
;
}
return
j
;
}
bool
vnodeValidateExprColumnInfo
(
SQueryTableMsg
*
pQueryTableMsg
,
SSqlFuncExprMsg
*
pExprMsg
)
{
int32_t
j
=
getColumnIndexInSource
(
pQueryTableMsg
,
pExprMsg
);
return
j
<
pQueryTableMsg
->
numOfCols
;
}
static
int32_t
validateQueryMeterMsg
(
SQueryTableMsg
*
pQueryTableMsg
)
{
if
(
pQueryTableMsg
->
intervalTime
<
0
)
{
dError
(
"qmsg:%p illegal value of aggTimeInterval %"
PRId64
""
,
pQueryTableMsg
,
pQueryTableMsg
->
intervalTime
);
return
-
1
;
}
if
(
pQueryTableMsg
->
numOfTagsCols
<
0
||
pQueryTableMsg
->
numOfTagsCols
>
TSDB_MAX_TAGS
+
1
)
{
dError
(
"qmsg:%p illegal value of numOfTagsCols %d"
,
pQueryTableMsg
,
pQueryTableMsg
->
numOfTagsCols
);
return
-
1
;
}
if
(
pQueryTableMsg
->
numOfCols
<=
0
||
pQueryTableMsg
->
numOfCols
>
TSDB_MAX_COLUMNS
)
{
dError
(
"qmsg:%p illegal value of numOfCols %d"
,
pQueryTableMsg
,
pQueryTableMsg
->
numOfCols
);
return
-
1
;
}
if
(
pQueryTableMsg
->
numOfTables
<=
0
)
{
dError
(
"qmsg:%p illegal value of numOfTables %d"
,
pQueryTableMsg
,
pQueryTableMsg
->
numOfTables
);
return
-
1
;
}
if
(
pQueryTableMsg
->
numOfGroupCols
<
0
)
{
dError
(
"qmsg:%p illegal value of numOfGroupbyCols %d"
,
pQueryTableMsg
,
pQueryTableMsg
->
numOfGroupCols
);
return
-
1
;
}
if
(
pQueryTableMsg
->
numOfOutputCols
>
TSDB_MAX_COLUMNS
||
pQueryTableMsg
->
numOfOutputCols
<=
0
)
{
dError
(
"qmsg:%p illegal value of output columns %d"
,
pQueryTableMsg
,
pQueryTableMsg
->
numOfOutputCols
);
return
-
1
;
}
if
(
pQueryTableMsg
->
tagLength
<
0
)
{
dError
(
"qmsg:%p illegal value of tag length %d"
,
pQueryTableMsg
,
pQueryTableMsg
->
tagLength
);
return
-
1
;
}
return
0
;
}
int32_t
convertQueryTableMsg
(
SQueryTableMsg
*
pQueryTableMsg
)
{
pQueryTableMsg
->
vnode
=
htons
(
pQueryTableMsg
->
vnode
);
pQueryTableMsg
->
numOfTables
=
htonl
(
pQueryTableMsg
->
numOfTables
);
#ifdef TSKEY32
pQueryTableMsg
->
skey
=
htonl
(
pQueryTableMsg
->
skey
);
pQueryTableMsg
->
ekey
=
htonl
(
pQueryTableMsg
->
ekey
);
#else
pQueryTableMsg
->
window
.
skey
=
htobe64
(
pQueryTableMsg
->
window
.
skey
);
pQueryTableMsg
->
window
.
ekey
=
htobe64
(
pQueryTableMsg
->
window
.
ekey
);
#endif
pQueryTableMsg
->
order
=
htons
(
pQueryTableMsg
->
order
);
pQueryTableMsg
->
orderColId
=
htons
(
pQueryTableMsg
->
orderColId
);
pQueryTableMsg
->
queryType
=
htons
(
pQueryTableMsg
->
queryType
);
pQueryTableMsg
->
intervalTime
=
htobe64
(
pQueryTableMsg
->
intervalTime
);
pQueryTableMsg
->
slidingTime
=
htobe64
(
pQueryTableMsg
->
slidingTime
);
pQueryTableMsg
->
numOfTagsCols
=
htons
(
pQueryTableMsg
->
numOfTagsCols
);
pQueryTableMsg
->
numOfCols
=
htons
(
pQueryTableMsg
->
numOfCols
);
pQueryTableMsg
->
numOfOutputCols
=
htons
(
pQueryTableMsg
->
numOfOutputCols
);
pQueryTableMsg
->
numOfGroupCols
=
htons
(
pQueryTableMsg
->
numOfGroupCols
);
pQueryTableMsg
->
tagLength
=
htons
(
pQueryTableMsg
->
tagLength
);
pQueryTableMsg
->
limit
=
htobe64
(
pQueryTableMsg
->
limit
);
pQueryTableMsg
->
offset
=
htobe64
(
pQueryTableMsg
->
offset
);
pQueryTableMsg
->
tsOffset
=
htonl
(
pQueryTableMsg
->
tsOffset
);
pQueryTableMsg
->
tsLen
=
htonl
(
pQueryTableMsg
->
tsLen
);
pQueryTableMsg
->
tsNumOfBlocks
=
htonl
(
pQueryTableMsg
->
tsNumOfBlocks
);
pQueryTableMsg
->
tsOrder
=
htonl
(
pQueryTableMsg
->
tsOrder
);
// query msg safety check
if
(
validateQueryMeterMsg
(
pQueryTableMsg
)
!=
0
)
{
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
// STableIdInfo **pSids = NULL;
char
*
pMsg
=
(
char
*
)(
pQueryTableMsg
->
colList
)
+
sizeof
(
SColumnInfo
)
*
pQueryTableMsg
->
numOfCols
;
for
(
int32_t
col
=
0
;
col
<
pQueryTableMsg
->
numOfCols
;
++
col
)
{
pQueryTableMsg
->
colList
[
col
].
colId
=
htons
(
pQueryTableMsg
->
colList
[
col
].
colId
);
pQueryTableMsg
->
colList
[
col
].
type
=
htons
(
pQueryTableMsg
->
colList
[
col
].
type
);
pQueryTableMsg
->
colList
[
col
].
bytes
=
htons
(
pQueryTableMsg
->
colList
[
col
].
bytes
);
pQueryTableMsg
->
colList
[
col
].
numOfFilters
=
htons
(
pQueryTableMsg
->
colList
[
col
].
numOfFilters
);
assert
(
pQueryTableMsg
->
colList
[
col
].
type
>=
TSDB_DATA_TYPE_BOOL
&&
pQueryTableMsg
->
colList
[
col
].
type
<=
TSDB_DATA_TYPE_NCHAR
);
int32_t
numOfFilters
=
pQueryTableMsg
->
colList
[
col
].
numOfFilters
;
if
(
numOfFilters
>
0
)
{
pQueryTableMsg
->
colList
[
col
].
filters
=
calloc
(
numOfFilters
,
sizeof
(
SColumnFilterInfo
));
}
for
(
int32_t
f
=
0
;
f
<
numOfFilters
;
++
f
)
{
SColumnFilterInfo
*
pFilterInfo
=
(
SColumnFilterInfo
*
)
pMsg
;
SColumnFilterInfo
*
pDestFilterInfo
=
&
pQueryTableMsg
->
colList
[
col
].
filters
[
f
];
pDestFilterInfo
->
filterOnBinary
=
htons
(
pFilterInfo
->
filterOnBinary
);
pMsg
+=
sizeof
(
SColumnFilterInfo
);
if
(
pDestFilterInfo
->
filterOnBinary
)
{
pDestFilterInfo
->
len
=
htobe64
(
pFilterInfo
->
len
);
pDestFilterInfo
->
pz
=
(
int64_t
)
calloc
(
1
,
pDestFilterInfo
->
len
+
1
);
memcpy
((
void
*
)
pDestFilterInfo
->
pz
,
pMsg
,
pDestFilterInfo
->
len
+
1
);
pMsg
+=
(
pDestFilterInfo
->
len
+
1
);
}
else
{
pDestFilterInfo
->
lowerBndi
=
htobe64
(
pFilterInfo
->
lowerBndi
);
pDestFilterInfo
->
upperBndi
=
htobe64
(
pFilterInfo
->
upperBndi
);
}
pDestFilterInfo
->
lowerRelOptr
=
htons
(
pFilterInfo
->
lowerRelOptr
);
pDestFilterInfo
->
upperRelOptr
=
htons
(
pFilterInfo
->
upperRelOptr
);
}
}
bool
hasArithmeticFunction
=
false
;
/*
* 1. simple projection query on meters, we only record the pSqlFuncExprs[i].colIdx value
* 2. for complex queries, whole SqlExprs object is required.
*/
pQueryTableMsg
->
pSqlFuncExprs
=
(
int64_t
)
malloc
(
POINTER_BYTES
*
pQueryTableMsg
->
numOfOutputCols
);
SSqlFuncExprMsg
*
pExprMsg
=
(
SSqlFuncExprMsg
*
)
pMsg
;
for
(
int32_t
i
=
0
;
i
<
pQueryTableMsg
->
numOfOutputCols
;
++
i
)
{
((
SSqlFuncExprMsg
**
)
pQueryTableMsg
->
pSqlFuncExprs
)[
i
]
=
pExprMsg
;
pExprMsg
->
colInfo
.
colIdx
=
htons
(
pExprMsg
->
colInfo
.
colIdx
);
pExprMsg
->
colInfo
.
colId
=
htons
(
pExprMsg
->
colInfo
.
colId
);
pExprMsg
->
colInfo
.
flag
=
htons
(
pExprMsg
->
colInfo
.
flag
);
pExprMsg
->
functionId
=
htons
(
pExprMsg
->
functionId
);
pExprMsg
->
numOfParams
=
htons
(
pExprMsg
->
numOfParams
);
pMsg
+=
sizeof
(
SSqlFuncExprMsg
);
for
(
int32_t
j
=
0
;
j
<
pExprMsg
->
numOfParams
;
++
j
)
{
pExprMsg
->
arg
[
j
].
argType
=
htons
(
pExprMsg
->
arg
[
j
].
argType
);
pExprMsg
->
arg
[
j
].
argBytes
=
htons
(
pExprMsg
->
arg
[
j
].
argBytes
);
if
(
pExprMsg
->
arg
[
j
].
argType
==
TSDB_DATA_TYPE_BINARY
)
{
pExprMsg
->
arg
[
j
].
argValue
.
pz
=
pMsg
;
pMsg
+=
pExprMsg
->
arg
[
j
].
argBytes
+
1
;
// one more for the string terminated char.
}
else
{
pExprMsg
->
arg
[
j
].
argValue
.
i64
=
htobe64
(
pExprMsg
->
arg
[
j
].
argValue
.
i64
);
}
}
if
(
pExprMsg
->
functionId
==
TSDB_FUNC_ARITHM
)
{
hasArithmeticFunction
=
true
;
}
else
if
(
pExprMsg
->
functionId
==
TSDB_FUNC_TAG
||
pExprMsg
->
functionId
==
TSDB_FUNC_TAGPRJ
||
pExprMsg
->
functionId
==
TSDB_FUNC_TAG_DUMMY
)
{
if
(
pExprMsg
->
colInfo
.
flag
!=
TSDB_COL_TAG
)
{
// ignore the column index check for arithmetic expression.
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
}
else
{
if
(
!
vnodeValidateExprColumnInfo
(
pQueryTableMsg
,
pExprMsg
))
{
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
}
pExprMsg
=
(
SSqlFuncExprMsg
*
)
pMsg
;
}
pQueryTableMsg
->
colNameLen
=
htonl
(
pQueryTableMsg
->
colNameLen
);
if
(
hasArithmeticFunction
)
{
// column name array
assert
(
pQueryTableMsg
->
colNameLen
>
0
);
pQueryTableMsg
->
colNameList
=
(
int64_t
)
pMsg
;
pMsg
+=
pQueryTableMsg
->
colNameLen
;
}
STableIdInfo
**
pSids
=
(
STableIdInfo
**
)
calloc
(
pQueryTableMsg
->
numOfTables
,
sizeof
(
STableIdInfo
*
));
pQueryTableMsg
->
pSidExtInfo
=
(
uint64_t
)
pSids
;
pSids
[
0
]
=
(
STableIdInfo
*
)
pMsg
;
pSids
[
0
]
->
sid
=
htonl
(
pSids
[
0
]
->
sid
);
pSids
[
0
]
->
uid
=
htobe64
(
pSids
[
0
]
->
uid
);
pSids
[
0
]
->
key
=
htobe64
(
pSids
[
0
]
->
key
);
for
(
int32_t
j
=
1
;
j
<
pQueryTableMsg
->
numOfTables
;
++
j
)
{
pSids
[
j
]
=
(
STableIdInfo
*
)((
char
*
)
pSids
[
j
-
1
]
+
sizeof
(
STableIdInfo
)
+
pQueryTableMsg
->
tagLength
);
pSids
[
j
]
->
sid
=
htonl
(
pSids
[
j
]
->
sid
);
pSids
[
j
]
->
uid
=
htobe64
(
pSids
[
j
]
->
uid
);
pSids
[
j
]
->
key
=
htobe64
(
pSids
[
j
]
->
key
);
}
pMsg
=
(
char
*
)
pSids
[
pQueryTableMsg
->
numOfTables
-
1
];
pMsg
+=
sizeof
(
STableIdInfo
)
+
pQueryTableMsg
->
tagLength
;
if
(
pQueryTableMsg
->
numOfGroupCols
>
0
||
pQueryTableMsg
->
numOfTagsCols
>
0
)
{
// group by tag columns
pQueryTableMsg
->
pTagSchema
=
(
uint64_t
)
pMsg
;
SSchema
*
pTagSchema
=
(
SSchema
*
)
pQueryTableMsg
->
pTagSchema
;
pMsg
+=
sizeof
(
SSchema
)
*
pQueryTableMsg
->
numOfTagsCols
;
if
(
pQueryTableMsg
->
numOfGroupCols
>
0
)
{
pQueryTableMsg
->
groupbyTagIds
=
(
uint64_t
)
&
(
pTagSchema
[
pQueryTableMsg
->
numOfTagsCols
]);
}
else
{
pQueryTableMsg
->
groupbyTagIds
=
0
;
}
pQueryTableMsg
->
orderByIdx
=
htons
(
pQueryTableMsg
->
orderByIdx
);
pQueryTableMsg
->
orderType
=
htons
(
pQueryTableMsg
->
orderType
);
pMsg
+=
sizeof
(
SColIndexEx
)
*
pQueryTableMsg
->
numOfGroupCols
;
}
else
{
pQueryTableMsg
->
pTagSchema
=
0
;
pQueryTableMsg
->
groupbyTagIds
=
0
;
}
pQueryTableMsg
->
interpoType
=
htons
(
pQueryTableMsg
->
interpoType
);
if
(
pQueryTableMsg
->
interpoType
!=
TSDB_INTERPO_NONE
)
{
pQueryTableMsg
->
defaultVal
=
(
uint64_t
)(
pMsg
);
int64_t
*
v
=
(
int64_t
*
)
pMsg
;
for
(
int32_t
i
=
0
;
i
<
pQueryTableMsg
->
numOfOutputCols
;
++
i
)
{
v
[
i
]
=
htobe64
(
v
[
i
]);
}
}
dTrace
(
"qmsg:%p query on %d meter(s), qrange:%"
PRId64
"-%"
PRId64
", numOfGroupbyTagCols:%d, numOfTagCols:%d, timestamp order:%d, "
"tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%"
PRId64
", fillType:%d, comptslen:%d, limit:%"
PRId64
", "
"offset:%"
PRId64
,
pQueryTableMsg
,
pQueryTableMsg
->
numOfTables
,
pQueryTableMsg
->
window
.
skey
,
pQueryTableMsg
->
window
.
ekey
,
pQueryTableMsg
->
numOfGroupCols
,
pQueryTableMsg
->
numOfTagsCols
,
pQueryTableMsg
->
order
,
pQueryTableMsg
->
orderType
,
pQueryTableMsg
->
orderByIdx
,
pQueryTableMsg
->
numOfOutputCols
,
pQueryTableMsg
->
numOfCols
,
pQueryTableMsg
->
intervalTime
,
pQueryTableMsg
->
interpoType
,
pQueryTableMsg
->
tsLen
,
pQueryTableMsg
->
limit
,
pQueryTableMsg
->
offset
);
return
0
;
}
static
int32_t
buildAirthmeticExprFromMsg
(
SSqlFunctionExpr
*
pExpr
,
SQueryTableMsg
*
pQueryMsg
)
{
SSqlBinaryExprInfo
*
pBinaryExprInfo
=
&
pExpr
->
binExprInfo
;
SColumnInfo
*
pColMsg
=
pQueryMsg
->
colList
;
#if 0
tSQLBinaryExpr* pBinExpr = NULL;
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz,
pExpr->pBase.arg[0].argBytes);
if (pBinExpr == NULL) {
dError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
return TSDB_CODE_APP_ERROR;
}
pBinaryExprInfo->pBinExpr = pBinExpr;
int32_t num = 0;
int16_t ids[TSDB_MAX_COLUMNS] = {0};
tSQLBinaryExprTrv(pBinExpr, &num, ids);
qsort(ids, num, sizeof(int16_t), id_compar);
int32_t i = 0, j = 0;
while (i < num && j < num) {
if (ids[i] == ids[j]) {
j++;
} else {
ids[++i] = ids[j++];
}
}
assert(i <= num);
// there may be duplicated referenced columns.
num = i + 1;
pBinaryExprInfo->pReqColumns = malloc(sizeof(SColIndexEx) * num);
for (int32_t k = 0; k < num; ++k) {
SColIndexEx* pColIndex = &pBinaryExprInfo->pReqColumns[k];
pColIndex->colId = ids[k];
}
pBinaryExprInfo->numOfCols = num;
free(pSchema);
#endif
return
TSDB_CODE_SUCCESS
;
}
int32_t
createSqlFunctionExprFromMsg
(
SQueryTableMsg
*
pQueryMsg
,
SSqlFunctionExpr
**
pSqlFuncExpr
)
{
*
pSqlFuncExpr
=
NULL
;
SSqlFunctionExpr
*
pExprs
=
(
SSqlFunctionExpr
*
)
calloc
(
1
,
sizeof
(
SSqlFunctionExpr
)
*
pQueryMsg
->
numOfOutputCols
);
if
(
pExprs
==
NULL
)
{
tfree
(
pQueryMsg
->
pSqlFuncExprs
);
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
bool
isSuperTable
=
QUERY_IS_STABLE_QUERY
(
pQueryMsg
->
queryType
);
int16_t
tagLen
=
0
;
SSchema
*
pTagSchema
=
(
SSchema
*
)
pQueryMsg
->
pTagSchema
;
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfOutputCols
;
++
i
)
{
pExprs
[
i
].
pBase
=
*
((
SSqlFuncExprMsg
**
)
pQueryMsg
->
pSqlFuncExprs
)[
i
];
pExprs
[
i
].
resBytes
=
0
;
int16_t
type
=
0
;
int16_t
bytes
=
0
;
SColIndexEx
*
pColumnIndexExInfo
=
&
pExprs
[
i
].
pBase
.
colInfo
;
// tag column schema is kept in pQueryMsg->pColumnModel
if
(
TSDB_COL_IS_TAG
(
pColumnIndexExInfo
->
flag
))
{
if
(
pColumnIndexExInfo
->
colIdx
>=
pQueryMsg
->
numOfTagsCols
)
{
tfree
(
pExprs
);
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
type
=
pTagSchema
[
pColumnIndexExInfo
->
colIdx
].
type
;
bytes
=
pTagSchema
[
pColumnIndexExInfo
->
colIdx
].
bytes
;
}
else
{
// parse the arithmetic expression
// if (pExprs[i].pBase.functionId == TSDB_FUNC_ARITHM) {
// *code = buildAirthmeticExprFromMsg(&pExprs[i], pQueryMsg);
//
// if (*code != TSDB_CODE_SUCCESS) {
// tfree(pExprs);
// return NULL;
// }
//
// type = TSDB_DATA_TYPE_DOUBLE;
// bytes = tDataTypeDesc[type].nSize;
// } else { // parse the normal column
// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase);
// assert(j < pQueryMsg->numOfCols);
//
// SColumnInfo* pCol = &pQueryMsg->colList[j];
// type = pCol->type;
// bytes = pCol->bytes;
// }
}
int32_t
param
=
pExprs
[
i
].
pBase
.
arg
[
0
].
argValue
.
i64
;
if
(
getResultDataInfo
(
type
,
bytes
,
pExprs
[
i
].
pBase
.
functionId
,
param
,
&
pExprs
[
i
].
resType
,
&
pExprs
[
i
].
resBytes
,
&
pExprs
[
i
].
interResBytes
,
0
,
isSuperTable
)
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
pExprs
);
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
if
(
pExprs
[
i
].
pBase
.
functionId
==
TSDB_FUNC_TAG_DUMMY
||
pExprs
[
i
].
pBase
.
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
tagLen
+=
pExprs
[
i
].
resBytes
;
}
assert
(
isValidDataType
(
pExprs
[
i
].
resType
,
pExprs
[
i
].
resBytes
));
}
// get the correct result size for top/bottom query, according to the number of tags columns in selection clause
// TODO refactor
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfOutputCols
;
++
i
)
{
pExprs
[
i
].
pBase
=
*
((
SSqlFuncExprMsg
**
)
pQueryMsg
->
pSqlFuncExprs
)[
i
];
int16_t
functId
=
pExprs
[
i
].
pBase
.
functionId
;
if
(
functId
==
TSDB_FUNC_TOP
||
functId
==
TSDB_FUNC_BOTTOM
)
{
int32_t
j
=
getColumnIndexInSource
(
pQueryMsg
,
&
pExprs
[
i
].
pBase
);
assert
(
j
<
pQueryMsg
->
numOfCols
);
SColumnInfo
*
pCol
=
&
pQueryMsg
->
colList
[
j
];
int16_t
type
=
pCol
->
type
;
int16_t
bytes
=
pCol
->
bytes
;
int32_t
ret
=
getResultDataInfo
(
type
,
bytes
,
pExprs
[
i
].
pBase
.
functionId
,
pExprs
[
i
].
pBase
.
arg
[
0
].
argValue
.
i64
,
&
pExprs
[
i
].
resType
,
&
pExprs
[
i
].
resBytes
,
&
pExprs
[
i
].
interResBytes
,
tagLen
,
isSuperTable
);
assert
(
ret
==
TSDB_CODE_SUCCESS
);
}
}
tfree
(
pQueryMsg
->
pSqlFuncExprs
);
*
pSqlFuncExpr
=
pExprs
;
return
TSDB_CODE_SUCCESS
;
}
SSqlGroupbyExpr
*
createGroupbyExprFromMsg
(
SQueryTableMsg
*
pQueryMsg
,
int32_t
*
code
)
{
if
(
pQueryMsg
->
numOfGroupCols
==
0
)
{
return
NULL
;
}
// using group by tag columns
SSqlGroupbyExpr
*
pGroupbyExpr
=
(
SSqlGroupbyExpr
*
)
malloc
(
sizeof
(
SSqlGroupbyExpr
)
+
pQueryMsg
->
numOfGroupCols
*
sizeof
(
SColIndexEx
));
if
(
pGroupbyExpr
==
NULL
)
{
*
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
return
NULL
;
}
SColIndexEx
*
pGroupbyColInfo
=
(
SColIndexEx
*
)
pQueryMsg
->
groupbyTagIds
;
pGroupbyExpr
->
numOfGroupCols
=
pQueryMsg
->
numOfGroupCols
;
pGroupbyExpr
->
orderType
=
pQueryMsg
->
orderType
;
pGroupbyExpr
->
orderIndex
=
pQueryMsg
->
orderByIdx
;
memcpy
(
pGroupbyExpr
->
columnInfo
,
pGroupbyColInfo
,
sizeof
(
SColIndexEx
)
*
pGroupbyExpr
->
numOfGroupCols
);
return
pGroupbyExpr
;
}
int32_t
vnodeCreateFilterInfo
(
void
*
pQInfo
,
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
if
(
pQuery
->
colList
[
i
].
info
.
numOfFilters
>
0
)
{
pQuery
->
numOfFilterCols
++
;
}
}
if
(
pQuery
->
numOfFilterCols
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
pQuery
->
pFilterInfo
=
calloc
(
1
,
sizeof
(
SSingleColumnFilterInfo
)
*
pQuery
->
numOfFilterCols
);
for
(
int32_t
i
=
0
,
j
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
if
(
pQuery
->
colList
[
i
].
info
.
numOfFilters
>
0
)
{
SSingleColumnFilterInfo
*
pFilterInfo
=
&
pQuery
->
pFilterInfo
[
j
];
memcpy
(
&
pFilterInfo
->
info
,
&
pQuery
->
colList
[
i
],
sizeof
(
SColumnInfoEx
));
pFilterInfo
->
info
.
info
.
filters
=
NULL
;
pFilterInfo
->
numOfFilters
=
pQuery
->
colList
[
i
].
info
.
numOfFilters
;
pFilterInfo
->
pFilters
=
calloc
(
pFilterInfo
->
numOfFilters
,
sizeof
(
SColumnFilterElem
));
for
(
int32_t
f
=
0
;
f
<
pFilterInfo
->
numOfFilters
;
++
f
)
{
SColumnFilterElem
*
pSingleColFilter
=
&
pFilterInfo
->
pFilters
[
f
];
pSingleColFilter
->
filterInfo
=
pQuery
->
colList
[
i
].
info
.
filters
[
f
];
int32_t
lower
=
pSingleColFilter
->
filterInfo
.
lowerRelOptr
;
int32_t
upper
=
pSingleColFilter
->
filterInfo
.
upperRelOptr
;
if
(
lower
==
TSDB_RELATION_INVALID
&&
upper
==
TSDB_RELATION_INVALID
)
{
dError
(
"QInfo:%p invalid filter info"
,
pQInfo
);
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
int16_t
type
=
pQuery
->
colList
[
i
].
info
.
type
;
int16_t
bytes
=
pQuery
->
colList
[
i
].
info
.
bytes
;
__filter_func_t
*
rangeFilterArray
=
NULL
;
// vnodeGetRangeFilterFuncArray(type);
__filter_func_t
*
filterArray
=
NULL
;
// vnodeGetValueFilterFuncArray(type);
if
(
rangeFilterArray
==
NULL
&&
filterArray
==
NULL
)
{
dError
(
"QInfo:%p failed to get filter function, invalid data type:%d"
,
pQInfo
,
type
);
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
if
((
lower
==
TSDB_RELATION_LARGE_EQUAL
||
lower
==
TSDB_RELATION_LARGE
)
&&
(
upper
==
TSDB_RELATION_LESS_EQUAL
||
upper
==
TSDB_RELATION_LESS
))
{
if
(
lower
==
TSDB_RELATION_LARGE_EQUAL
)
{
if
(
upper
==
TSDB_RELATION_LESS_EQUAL
)
{
pSingleColFilter
->
fp
=
rangeFilterArray
[
4
];
}
else
{
pSingleColFilter
->
fp
=
rangeFilterArray
[
2
];
}
}
else
{
if
(
upper
==
TSDB_RELATION_LESS_EQUAL
)
{
pSingleColFilter
->
fp
=
rangeFilterArray
[
3
];
}
else
{
pSingleColFilter
->
fp
=
rangeFilterArray
[
1
];
}
}
}
else
{
// set callback filter function
if
(
lower
!=
TSDB_RELATION_INVALID
)
{
pSingleColFilter
->
fp
=
filterArray
[
lower
];
if
(
upper
!=
TSDB_RELATION_INVALID
)
{
dError
(
"pQInfo:%p failed to get filter function, invalid filter condition"
,
pQInfo
,
type
);
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
}
else
{
pSingleColFilter
->
fp
=
filterArray
[
upper
];
}
}
assert
(
pSingleColFilter
->
fp
!=
NULL
);
pSingleColFilter
->
bytes
=
bytes
;
}
j
++
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SSqlGroupbyExpr
*
pGroupbyExpr
,
SSqlFunctionExpr
*
pExprs
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
calloc
(
1
,
sizeof
(
SQInfo
));
if
(
pQInfo
==
NULL
)
{
goto
_clean_memory
;
}
SQuery
*
pQuery
=
calloc
(
1
,
sizeof
(
SQuery
));
pQInfo
->
runtimeEnv
.
pQuery
=
pQuery
;
int16_t
numOfCols
=
pQueryMsg
->
numOfCols
;
int16_t
numOfOutputCols
=
pQueryMsg
->
numOfOutputCols
;
pQuery
->
numOfCols
=
numOfCols
;
pQuery
->
numOfOutputCols
=
numOfOutputCols
;
pQuery
->
limit
.
limit
=
pQueryMsg
->
limit
;
pQuery
->
limit
.
offset
=
pQueryMsg
->
offset
;
pQuery
->
order
.
order
=
pQueryMsg
->
order
;
pQuery
->
order
.
orderColId
=
pQueryMsg
->
orderColId
;
pQuery
->
pSelectExpr
=
pExprs
;
pQuery
->
pGroupbyExpr
=
pGroupbyExpr
;
pQuery
->
intervalTime
=
pQueryMsg
->
intervalTime
;
pQuery
->
slidingTime
=
pQueryMsg
->
slidingTime
;
pQuery
->
slidingTimeUnit
=
pQueryMsg
->
slidingTimeUnit
;
pQuery
->
interpoType
=
pQueryMsg
->
interpoType
;
pQuery
->
colList
=
calloc
(
1
,
sizeof
(
SSingleColumnFilterInfo
)
*
numOfCols
);
if
(
pQuery
->
colList
==
NULL
)
{
goto
_clean_memory
;
}
// calculate the result row size
for
(
int16_t
col
=
0
;
col
<
numOfOutputCols
;
++
col
)
{
assert
(
pExprs
[
col
].
resBytes
>
0
);
pQuery
->
rowSize
+=
pExprs
[
col
].
resBytes
;
}
int32_t
ret
=
vnodeCreateFilterInfo
(
pQInfo
,
pQuery
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
goto
_clean_memory
;
}
// prepare the result buffer
pQuery
->
sdata
=
(
SData
**
)
calloc
(
pQuery
->
numOfOutputCols
,
sizeof
(
SData
*
));
if
(
pQuery
->
sdata
==
NULL
)
{
goto
_clean_memory
;
}
for
(
int32_t
col
=
0
;
col
<
pQuery
->
numOfOutputCols
;
++
col
)
{
assert
(
pExprs
[
col
].
interResBytes
>=
pExprs
[
col
].
resBytes
);
// allocate additional memory for interResults that are usually larger then final results
// size_t size = (pQInfo->query.pointsToRead + 1) * pExprs[col].resBytes + pExprs[col].interResBytes +
// sizeof(SData);
size_t
size
=
1000
;
pQuery
->
sdata
[
col
]
=
(
SData
*
)
calloc
(
1
,
size
);
if
(
pQuery
->
sdata
[
col
]
==
NULL
)
{
goto
_clean_memory
;
}
}
if
(
pQuery
->
interpoType
!=
TSDB_INTERPO_NONE
)
{
pQuery
->
defaultVal
=
malloc
(
sizeof
(
int64_t
)
*
pQuery
->
numOfOutputCols
);
if
(
pQuery
->
defaultVal
==
NULL
)
{
goto
_clean_memory
;
}
// the first column is the timestamp
memcpy
(
pQuery
->
defaultVal
,
(
char
*
)
pQueryMsg
->
defaultVal
,
pQuery
->
numOfOutputCols
*
sizeof
(
int64_t
));
}
// to make sure third party won't overwrite this structure
pQInfo
->
signature
=
(
uint64_t
)
pQInfo
;
pQuery
->
pos
=
-
1
;
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
// pQInfo);
return
pQInfo
;
_clean_memory:
tfree
(
pQuery
->
defaultVal
);
if
(
pQuery
->
sdata
!=
NULL
)
{
for
(
int16_t
col
=
0
;
col
<
pQuery
->
numOfOutputCols
;
++
col
)
{
tfree
(
pQuery
->
sdata
[
col
]);
}
}
tfree
(
pQuery
->
sdata
);
tfree
(
pQuery
->
pFilterInfo
);
tfree
(
pQuery
->
colList
);
tfree
(
pExprs
);
tfree
(
pGroupbyExpr
);
tfree
(
pQInfo
);
return
NULL
;
}
int32_t
createQInfo
(
SQueryTableMsg
*
pQueryMsg
,
SSqlGroupbyExpr
*
pGroupbyExpr
,
SSqlFunctionExpr
*
pSqlExprs
,
SQInfo
**
pQInfo
)
{
SQuery
*
pQuery
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
pGroupbyExpr
,
pSqlExprs
);
if
(
pQInfo
==
NULL
)
{
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
goto
_error
;
}
pQuery
=
(
*
pQInfo
)
->
runtimeEnv
.
pQuery
;
dTrace
(
"qmsg:%p create QInfo:%p, QInfo created"
,
pQueryMsg
,
pQInfo
);
STableIdInfo
**
pSids
=
(
STableIdInfo
**
)
pQueryMsg
->
pSidExtInfo
;
if
(
pSids
!=
NULL
&&
pSids
[
0
]
->
key
>
0
)
{
pQuery
->
window
.
skey
=
pSids
[
0
]
->
key
;
}
else
{
pQuery
->
window
.
skey
=
pQueryMsg
->
window
.
skey
;
}
pQuery
->
window
.
ekey
=
pQueryMsg
->
window
.
ekey
;
pQuery
->
lastKey
=
pQuery
->
window
.
skey
;
if
(
sem_init
(
&
(
*
pQInfo
)
->
dataReady
,
0
,
0
)
!=
0
)
{
// dError("QInfo:%p vid:%d sid:%d meterId:%s, init dataReady sem failed, reason:%s", pQInfo, pMeterObj->vnode,
// pMeterObj->sid, pMeterObj->meterId, strerror(errno));
code
=
TSDB_CODE_APP_ERROR
;
goto
_error
;
}
vnodeParametersSafetyCheck
(
pQuery
);
(
*
pQInfo
)
->
pTableList
=
taosHashInit
(
pQueryMsg
->
numOfTables
,
taosIntHash_32
,
false
);
// taosHashPut(pSupporter->pMetersHashTable, (const char *)&pMetersObj[0]->sid, sizeof(pMeterObj[0].sid),
// (char *)&pMetersObj[0], POINTER_BYTES);
STSBuf
*
pTSBuf
=
NULL
;
if
(
pQueryMsg
->
tsLen
>
0
)
{
// open new file to save the result
char
*
tsBlock
=
(
char
*
)
pQueryMsg
+
pQueryMsg
->
tsOffset
;
pTSBuf
=
tsBufCreateFromCompBlocks
(
tsBlock
,
pQueryMsg
->
tsNumOfBlocks
,
pQueryMsg
->
tsLen
,
pQueryMsg
->
tsOrder
);
tsBufResetPos
(
pTSBuf
);
tsBufNextPos
(
pTSBuf
);
}
if
((
code
=
vnodeQueryTablePrepare
(
*
pQInfo
,
pTSBuf
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
// if (pQInfo->over == 1) {
// vnodeAddRefCount(pQInfo); // for retrieve procedure
// return pQInfo;
// }
// dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount);
// taosScheduleTask(queryQhandle, &schedMsg);
return
code
;
_error:
// table query ref will be decrease during error handling
// vnodeFreeQInfo(pQInfo, false);
return
code
;
}
int32_t
qCreateQueryInfo
(
SQueryTableMsg
*
pQueryTableMsg
,
SQInfo
**
pQInfo
)
{
assert
(
pQueryTableMsg
!=
NULL
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
convertQueryTableMsg
(
pQueryTableMsg
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
pQueryTableMsg
->
numOfTables
<=
0
)
{
dError
(
"Invalid number of tables to query, numOfTables:%d"
,
pQueryTableMsg
->
numOfTables
);
code
=
TSDB_CODE_INVALID_QUERY_MSG
;
goto
_query_over
;
}
// todo check vnode status
if
(
pQueryTableMsg
->
pSidExtInfo
==
0
)
{
dError
(
"qmsg:%p, SQueryTableMsg wrong format"
,
pQueryTableMsg
);
code
=
TSDB_CODE_INVALID_QUERY_MSG
;
goto
_query_over
;
}
SSqlFunctionExpr
*
pExprs
=
NULL
;
if
((
code
=
createSqlFunctionExprFromMsg
(
pQueryTableMsg
,
&
pExprs
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_query_over
;
}
SSqlGroupbyExpr
*
pGroupbyExpr
=
createGroupbyExprFromMsg
(
pQueryTableMsg
,
&
code
);
if
((
pGroupbyExpr
==
NULL
&&
pQueryTableMsg
->
numOfGroupCols
!=
0
)
||
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_query_over
;
}
if
(
QUERY_IS_STABLE_QUERY
(
pQueryTableMsg
->
queryType
))
{
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
}
else
{
code
=
createQInfo
(
pQueryTableMsg
,
pGroupbyExpr
,
pExprs
,
pQInfo
);
}
_query_over:
// if failed to add ref for all meters in this query, abort current query
// if (code != TSDB_CODE_SUCCESS) {
// vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber);
// }
//
// tfree(pQueryTableMsg->pSqlFuncExprs);
// tfree(pMeterObjList);
// ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle);
//
// tfree(pQueryTableMsg->pSidExtInfo);
// for(int32_t i = 0; i < pQueryTableMsg->numOfCols; ++i) {
// vnodeFreeColumnInfo(&pQueryTableMsg->colList[i]);
// }
//
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录