Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7fc487b8
TDengine
项目概览
taosdata
/
TDengine
12 个月 前同步成功
通知
1180
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
7fc487b8
编写于
7月 07, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/develop' into hotfix/crash
上级
df63307f
0062af55
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
334 addition
and
407 deletion
+334
-407
src/client/inc/tschemautil.h
src/client/inc/tschemautil.h
+0
-2
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+1
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+6
-6
src/client/src/tscSchemaUtil.c
src/client/src/tscSchemaUtil.c
+0
-22
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+18
-43
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+5
-5
src/common/inc/tname.h
src/common/inc/tname.h
+3
-0
src/common/inc/tvariant.h
src/common/inc/tvariant.h
+0
-0
src/common/src/tname.c
src/common/src/tname.c
+24
-1
src/common/src/tvariant.c
src/common/src/tvariant.c
+0
-0
src/inc/tsdb.h
src/inc/tsdb.h
+2
-1
src/query/inc/qast.h
src/query/inc/qast.h
+3
-10
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+129
-92
src/query/src/qUtil.c
src/query/src/qUtil.c
+1
-2
src/query/src/qast.c
src/query/src/qast.c
+33
-131
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+0
-5
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+38
-35
src/util/inc/tlosertree.h
src/util/inc/tlosertree.h
+0
-0
src/util/src/tlosertree.c
src/util/src/tlosertree.c
+3
-3
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+57
-48
tests/script/general/parser/join.sim
tests/script/general/parser/join.sim
+11
-0
未找到文件。
src/client/inc/tschemautil.h
浏览文件 @
7fc487b8
...
...
@@ -110,8 +110,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
//todo tags value as well as the table id structure needs refactor
char
*
tsGetTagsValue
(
STableMeta
*
pMeta
);
void
extractTableNameFromToken
(
SSQLToken
*
pToken
,
SSQLToken
*
pTable
);
#ifdef __cplusplus
}
#endif
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
7fc487b8
...
...
@@ -14,7 +14,6 @@
*/
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "qfill.h"
#include "qhistogram.h"
...
...
@@ -23,6 +22,7 @@
#include "qtsbuf.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "qast.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscompression.h"
...
...
src/client/src/tscSQLParser.c
浏览文件 @
7fc487b8
...
...
@@ -18,19 +18,19 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qast.h"
#include "taos.h"
#include "taosmsg.h"
#include "
tstoken
.h"
#include "t
strbuild
.h"
#include "t
ti
me.h"
#include "
qast
.h"
#include "t
compare
.h"
#include "t
na
me.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h"
#include "ttokendef.h"
#include "tname.h"
#include "tcompare.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
...
...
src/client/src/tscSchemaUtil.c
浏览文件 @
7fc487b8
...
...
@@ -215,25 +215,3 @@ __attribute__ ((unused)) static FORCE_INLINE size_t copy(char* dst, const char*
return
len
;
}
/*
* tablePrefix.columnName
* extract table name and save it in pTable, with only column name in pToken
*/
void
extractTableNameFromToken
(
SSQLToken
*
pToken
,
SSQLToken
*
pTable
)
{
const
char
sep
=
TS_PATH_DELIMITER
[
0
];
if
(
pToken
==
pTable
||
pToken
==
NULL
||
pTable
==
NULL
)
{
return
;
}
char
*
r
=
strnchr
(
pToken
->
z
,
sep
,
pToken
->
n
,
false
);
if
(
r
!=
NULL
)
{
// record the table name token
pTable
->
n
=
r
-
pToken
->
z
;
pTable
->
z
=
pToken
->
z
;
r
+=
1
;
pToken
->
n
-=
(
r
-
pToken
->
z
);
pToken
->
z
=
r
;
}
}
src/client/src/tscSubquery.c
浏览文件 @
7fc487b8
...
...
@@ -14,12 +14,12 @@
*/
#include "os.h"
#include "
tscSubquery
.h"
#include "
qtsbuf
.h"
#include "qast.h"
#include "tcompare.h"
#include "tschemautil.h"
#include "qtsbuf.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
typedef
struct
SInsertSupporter
{
...
...
@@ -57,10 +57,15 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
pSubQueryInfo1
->
tsBuf
=
output1
;
pSubQueryInfo2
->
tsBuf
=
output2
;
// no result generated, return directly
if
(
pSupporter1
->
pTSBuf
==
NULL
||
pSupporter2
->
pTSBuf
==
NULL
)
{
tscDebug
(
"%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting"
,
pSql
);
return
0
;
}
tsBufResetPos
(
pSupporter1
->
pTSBuf
);
tsBufResetPos
(
pSupporter2
->
pTSBuf
);
// TODO add more details information
if
(
!
tsBufNextPos
(
pSupporter1
->
pTSBuf
))
{
tsBufFlush
(
output1
);
tsBufFlush
(
output2
);
...
...
@@ -210,6 +215,7 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
pSupporter
->
f
=
NULL
;
}
tfree
(
pSupporter
->
pIdTagList
);
tscTagCondRelease
(
&
pSupporter
->
tagCond
);
free
(
pSupporter
);
}
...
...
@@ -420,43 +426,6 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
pQueryInfo
->
window
=
*
win
;
}
static
UNUSED_FUNC
void
tSIntersectionAndLaunchSecQuery
(
SJoinSupporter
*
pSupporter
,
SSqlObj
*
pSql
)
{
SSqlObj
*
pParentSql
=
pSupporter
->
pObj
;
SQueryInfo
*
pParentQueryInfo
=
tscGetQueryInfoDetail
(
&
pParentSql
->
cmd
,
pParentSql
->
cmd
.
clauseIndex
);
// if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// assert(pQueryInfo->numOfTables == 1);
//
// // for projection query, need to try next vnode
//// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// int32_t totalVnode = 0;
// if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
// tscDebug("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
// pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
//
// pSql->cmd.command = TSDB_SQL_SELECT;
// pSql->fp = tscJoinQueryCallback;
// tscProcessSql(pSql);
//
// return;
// }
// }
SJoinSupporter
*
p1
=
pParentSql
->
pSubs
[
0
]
->
param
;
SJoinSupporter
*
p2
=
pParentSql
->
pSubs
[
1
]
->
param
;
STimeWindow
win
=
TSWINDOW_INITIALIZER
;
int64_t
num
=
doTSBlockIntersect
(
pParentSql
,
p1
,
p2
,
&
win
);
if
(
num
<=
0
)
{
// no result during ts intersect
tscDebug
(
"%p free all sub SqlObj and quit"
,
pParentSql
);
freeJoinSubqueryObj
(
pParentSql
);
}
else
{
updateQueryTimeRange
(
pParentQueryInfo
,
&
win
);
tscLaunchRealSubqueries
(
pParentSql
);
}
}
int32_t
tscCompareTidTags
(
const
void
*
p1
,
const
void
*
p2
)
{
const
STidTags
*
t1
=
(
const
STidTags
*
)
varDataVal
(
p1
);
const
STidTags
*
t2
=
(
const
STidTags
*
)
varDataVal
(
p2
);
...
...
@@ -713,9 +682,12 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SArray
*
s1
=
NULL
,
*
s2
=
NULL
;
getIntersectionOfTableTuple
(
pQueryInfo
,
pParentSql
,
&
s1
,
&
s2
);
if
(
taosArrayGetSize
(
s1
)
==
0
||
taosArrayGetSize
(
s2
)
==
0
)
{
// no results,return.
tscDebug
(
"%p free all sub SqlObj and quit"
,
pParentSql
);
tscDebug
(
"%p
tag intersect does not generated qualified tables for join,
free all sub SqlObj and quit"
,
pParentSql
);
freeJoinSubqueryObj
(
pParentSql
);
// set no result command
pParentSql
->
cmd
.
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
(
*
pParentSql
->
fp
)(
pParentSql
->
param
,
pParentSql
,
0
);
}
else
{
// proceed to for ts_comp query
SSqlCmd
*
pSubCmd1
=
&
pParentSql
->
pSubs
[
0
]
->
cmd
;
...
...
@@ -846,7 +818,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if
(
num
<=
0
)
{
// no result during ts intersect
tscDebug
(
"%p no results generated in ts intersection, free all sub SqlObj and quit"
,
pParentSql
);
freeJoinSubqueryObj
(
pParentSql
);
// set no result command
pParentSql
->
cmd
.
command
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
(
*
pParentSql
->
fp
)(
pParentSql
->
param
,
pParentSql
,
0
);
return
;
}
...
...
src/client/src/tscUtil.c
浏览文件 @
7fc487b8
...
...
@@ -14,21 +14,21 @@
*/
#include "os.h"
#include "qast.h"
#include "hash.h"
#include "tscUtil.h"
#include "taosmsg.h"
#include "qast.h"
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
#include "tscProfile.h"
#include "tscLocalMerge.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "hash.h"
static
void
freeQueryInfoImpl
(
SQueryInfo
*
pQueryInfo
);
static
void
clearAllTableMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
address
,
bool
removeFromCache
);
...
...
src/common/inc/tname.h
浏览文件 @
7fc487b8
...
...
@@ -3,6 +3,7 @@
#include "os.h"
#include "taosmsg.h"
#include "tstoken.h"
typedef
struct
SDataStatis
{
int16_t
colId
;
...
...
@@ -23,6 +24,8 @@ void extractTableName(const char *tableId, char *name);
char
*
extractDBName
(
const
char
*
tableId
,
char
*
name
);
void
extractTableNameFromToken
(
SSQLToken
*
pToken
,
SSQLToken
*
pTable
);
SSchema
tGetTableNameColumnSchema
();
bool
tscValidateTableNameLength
(
size_t
len
);
...
...
src/
query
/inc/tvariant.h
→
src/
common
/inc/tvariant.h
浏览文件 @
7fc487b8
文件已移动
src/common/src/tname.c
浏览文件 @
7fc487b8
...
...
@@ -81,7 +81,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
return
startTime
;
}
int64_t
start
=
((
startTime
-
interval
Time
)
/
slidingTime
+
1
)
*
slidingTime
;
int64_t
start
=
((
startTime
-
sliding
Time
)
/
slidingTime
+
1
)
*
slidingTime
;
if
(
!
(
timeUnit
==
'a'
||
timeUnit
==
'm'
||
timeUnit
==
's'
||
timeUnit
==
'h'
))
{
/*
* here we revised the start time of day according to the local time zone,
...
...
@@ -105,3 +105,26 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
}
return
start
;
}
/*
* tablePrefix.columnName
* extract table name and save it in pTable, with only column name in pToken
*/
void
extractTableNameFromToken
(
SSQLToken
*
pToken
,
SSQLToken
*
pTable
)
{
const
char
sep
=
TS_PATH_DELIMITER
[
0
];
if
(
pToken
==
pTable
||
pToken
==
NULL
||
pTable
==
NULL
)
{
return
;
}
char
*
r
=
strnchr
(
pToken
->
z
,
sep
,
pToken
->
n
,
false
);
if
(
r
!=
NULL
)
{
// record the table name token
pTable
->
n
=
r
-
pToken
->
z
;
pTable
->
z
=
pToken
->
z
;
r
+=
1
;
pToken
->
n
-=
(
r
-
pToken
->
z
);
pToken
->
z
=
r
;
}
}
src/
query
/src/tvariant.c
→
src/
common
/src/tvariant.c
浏览文件 @
7fc487b8
文件已移动
src/inc/tsdb.h
浏览文件 @
7fc487b8
...
...
@@ -235,9 +235,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
* Get current data block information
*
* @param pQueryHandle
* @param pBlockInfo
* @return
*/
SDataBlockInfo
tsdbRetrieveDataBlockInfo
(
TsdbQueryHandleT
*
pQueryHandle
);
void
tsdbRetrieveDataBlockInfo
(
TsdbQueryHandleT
*
pQueryHandle
,
SDataBlockInfo
*
pBlockInfo
);
/**
*
...
...
src/query/inc/qast.h
浏览文件 @
7fc487b8
...
...
@@ -16,16 +16,16 @@
#ifndef TDENGINE_TAST_H
#define TDENGINE_TAST_H
#include <tbuffer.h>
#ifdef __cplusplus
extern
"C"
{
#endif
#include <tskiplist.h>
#include "os.h"
#include "taosmsg.h"
#include "taosdef.h"
#include "tskiplist.h"
#include "tbuffer.h"
#include "tvariant.h"
struct
tExprNode
;
...
...
@@ -75,10 +75,6 @@ typedef struct tExprNode {
};
}
tExprNode
;
void
tSQLBinaryExprFromString
(
tExprNode
**
pExpr
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
char
*
src
,
int32_t
len
);
void
tSQLBinaryExprToString
(
tExprNode
*
pExpr
,
char
*
dst
,
int32_t
*
len
);
void
tExprTreeDestroy
(
tExprNode
**
pExprs
,
void
(
*
fp
)(
void
*
));
void
tExprTreeTraverse
(
tExprNode
*
pExpr
,
SSkipList
*
pSkipList
,
SArray
*
result
,
SExprTraverseSupp
*
param
);
...
...
@@ -86,12 +82,9 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
void
tExprTreeCalcTraverse
(
tExprNode
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
char
*
(
*
cb
)(
void
*
,
const
char
*
,
int32_t
));
// todo refactor: remove it
void
tSQLBinaryExprTrv
(
tExprNode
*
pExprs
,
SArray
*
res
);
uint8_t
getBinaryExprOptr
(
SSQLToken
*
pToken
);
void
tExprNodeDestroy
(
tExprNode
*
pNode
,
void
(
*
fp
)(
void
*
));
void
tExprNodeDestroy
(
tExprNode
*
pNode
,
void
(
*
fp
)(
void
*
));
void
exprTreeToBinary
(
SBufferWriter
*
bw
,
tExprNode
*
pExprTree
);
tExprNode
*
exprTreeFromBinary
(
const
void
*
data
,
size_t
size
);
...
...
src/query/src/qExecutor.c
浏览文件 @
7fc487b8
...
...
@@ -18,18 +18,18 @@
#include "qfill.h"
#include "taosmsg.h"
#include "exception.h"
#include "hash.h"
#include "qExecutor.h"
#include "qUtil.h"
#include "qast.h"
#include "qresultBuf.h"
#include "query.h"
#include "queryLog.h"
#include "qast.h"
#include "tfile.h"
#include "tlosertree.h"
#include "exception.h"
#include "tscompression.h"
#include "ttime.h"
#include "tfile.h"
/**
* check if the primary column is load by default, otherwise, the program will
...
...
@@ -49,6 +49,8 @@
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes)
...
...
@@ -120,7 +122,6 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
static
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
static
bool
isIntervalQuery
(
SQuery
*
pQuery
)
{
return
pQuery
->
intervalTime
>
0
;
}
// todo move to utility
static
int32_t
mergeIntoGroupResultImpl
(
SQInfo
*
pQInfo
,
SArray
*
group
);
...
...
@@ -397,34 +398,38 @@ static bool hasNullValue(SQuery *pQuery, int32_t col, int32_t numOfCols, SDataSt
}
static
SWindowResult
*
doSetTimeWindowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResInfo
*
pWindowResInfo
,
char
*
pData
,
int16_t
bytes
)
{
int16_t
bytes
,
bool
masterscan
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
*
p1
=
(
int32_t
*
)
taosHashGet
(
pWindowResInfo
->
hashList
,
pData
,
bytes
);
if
(
p1
!=
NULL
)
{
pWindowResInfo
->
curIndex
=
*
p1
;
}
else
{
// more than the capacity, reallocate the resources
if
(
pWindowResInfo
->
size
>=
pWindowResInfo
->
capacity
)
{
int64_t
newCap
=
pWindowResInfo
->
capacity
*
2
;
char
*
t
=
realloc
(
pWindowResInfo
->
pResult
,
newCap
*
sizeof
(
SWindowResult
));
if
(
t
!=
NULL
)
{
pWindowResInfo
->
pResult
=
(
SWindowResult
*
)
t
;
memset
(
&
pWindowResInfo
->
pResult
[
pWindowResInfo
->
capacity
],
0
,
sizeof
(
SWindowResult
)
*
pWindowResInfo
->
capacity
);
}
else
{
// todo
}
}
else
{
if
(
masterscan
)
{
// more than the capacity, reallocate the resources
if
(
pWindowResInfo
->
size
>=
pWindowResInfo
->
capacity
)
{
int64_t
newCap
=
pWindowResInfo
->
capacity
*
2
;
char
*
t
=
realloc
(
pWindowResInfo
->
pResult
,
newCap
*
sizeof
(
SWindowResult
));
if
(
t
!=
NULL
)
{
pWindowResInfo
->
pResult
=
(
SWindowResult
*
)
t
;
memset
(
&
pWindowResInfo
->
pResult
[
pWindowResInfo
->
capacity
],
0
,
sizeof
(
SWindowResult
)
*
pWindowResInfo
->
capacity
);
}
else
{
// todo
}
for
(
int32_t
i
=
pWindowResInfo
->
capacity
;
i
<
newCap
;
++
i
)
{
SPosInfo
pos
=
{
-
1
,
-
1
};
createQueryResultInfo
(
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
&
pos
);
for
(
int32_t
i
=
pWindowResInfo
->
capacity
;
i
<
newCap
;
++
i
)
{
SPosInfo
pos
=
{
-
1
,
-
1
};
createQueryResultInfo
(
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
&
pos
);
}
pWindowResInfo
->
capacity
=
newCap
;
}
pWindowResInfo
->
capacity
=
newCap
;
}
// add a new result set for a new group
pWindowResInfo
->
curIndex
=
pWindowResInfo
->
size
++
;
taosHashPut
(
pWindowResInfo
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pWindowResInfo
->
curIndex
,
sizeof
(
int32_t
));
// add a new result set for a new group
pWindowResInfo
->
curIndex
=
pWindowResInfo
->
size
++
;
taosHashPut
(
pWindowResInfo
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pWindowResInfo
->
curIndex
,
sizeof
(
int32_t
));
}
else
{
return
NULL
;
}
}
return
getWindowResult
(
pWindowResInfo
,
pWindowResInfo
->
curIndex
);
...
...
@@ -511,15 +516,19 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
}
static
int32_t
setWindowOutputBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResInfo
*
pWindowResInfo
,
int32_t
sid
,
STimeWindow
*
win
)
{
STimeWindow
*
win
,
bool
masterscan
,
bool
*
newWind
)
{
assert
(
win
->
skey
<=
win
->
ekey
);
SDiskbasedResultBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
SWindowResult
*
pWindowRes
=
doSetTimeWindowFromKey
(
pRuntimeEnv
,
pWindowResInfo
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
);
SWindowResult
*
pWindowRes
=
doSetTimeWindowFromKey
(
pRuntimeEnv
,
pWindowResInfo
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
masterscan
);
if
(
pWindowRes
==
NULL
)
{
return
-
1
;
*
newWind
=
false
;
return
masterscan
?
-
1
:
0
;
}
*
newWind
=
true
;
// not assign result buffer yet, add new result buffer
if
(
pWindowRes
->
pos
.
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
sid
,
pRuntimeEnv
->
numOfRowsPerPage
);
...
...
@@ -563,7 +572,7 @@ static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t sea
*/
static
int32_t
doCheckQueryCompleted
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
TSKEY
lastKey
,
SWindowResInfo
*
pWindowResInfo
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
pRuntimeEnv
->
scanFlag
!=
MASTER_SCAN
||
(
!
isIntervalQuery
(
pQuery
)))
{
if
(
pRuntimeEnv
->
scanFlag
!=
MASTER_SCAN
||
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
)))
{
return
pWindowResInfo
->
size
;
}
...
...
@@ -686,24 +695,26 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
base
.
functionId
;
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
)
||
pStatus
->
closed
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
base
.
functionId
;
pCtx
[
k
].
nStartQueryTimestamp
=
pWin
->
skey
;
pCtx
[
k
].
size
=
forwardStep
;
pCtx
[
k
].
startOffset
=
(
QUERY_IS_ASC_QUERY
(
pQuery
))
?
offset
:
offset
-
(
forwardStep
-
1
);
pCtx
[
k
].
nStartQueryTimestamp
=
pWin
->
skey
;
pCtx
[
k
].
size
=
forwardStep
;
pCtx
[
k
].
startOffset
=
(
QUERY_IS_ASC_QUERY
(
pQuery
))
?
offset
:
offset
-
(
forwardStep
-
1
);
if
((
aAggs
[
functionId
].
nStatus
&
TSDB_FUNCSTATE_SELECTIVITY
)
!=
0
)
{
pCtx
[
k
].
ptsList
=
&
tsBuf
[
offset
];
}
if
((
aAggs
[
functionId
].
nStatus
&
TSDB_FUNCSTATE_SELECTIVITY
)
!=
0
)
{
pCtx
[
k
].
ptsList
=
&
tsBuf
[
offset
];
}
// not a whole block involved in query processing, statistics data can not be used
if
(
forwardStep
!=
numOfTotal
)
{
pCtx
[
k
].
preAggVals
.
isSet
=
false
;
}
// not a whole block involved in query processing, statistics data can not be used
if
(
forwardStep
!=
numOfTotal
)
{
pCtx
[
k
].
preAggVals
.
isSet
=
false
;
}
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
aAggs
[
functionId
].
xFunction
(
&
pCtx
[
k
]);
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
aAggs
[
functionId
].
xFunction
(
&
pCtx
[
k
]);
}
}
}
}
...
...
@@ -713,12 +724,14 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
pCtx
[
k
].
nStartQueryTimestamp
=
pWin
->
skey
;
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
)
||
pStatus
->
closed
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
pCtx
[
k
].
nStartQueryTimestamp
=
pWin
->
skey
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
base
.
functionId
;
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
aAggs
[
functionId
].
xFunctionF
(
&
pCtx
[
k
],
offset
);
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
base
.
functionId
;
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
aAggs
[
functionId
].
xFunctionF
(
&
pCtx
[
k
],
offset
);
}
}
}
}
...
...
@@ -880,7 +893,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SDataBlockInfo
*
pDataBlockInfo
,
SWindowResInfo
*
pWindowResInfo
,
__block_search_fn_t
searchFn
,
SArray
*
pDataBlock
)
{
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
bool
masterScan
=
IS_MASTER_SCAN
(
pRuntimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
TSKEY
*
tsCols
=
NULL
;
if
(
pDataBlock
!=
NULL
)
{
...
...
@@ -903,18 +917,21 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
int32_t
offset
=
GET_COL_DATA_POS
(
pQuery
,
0
,
step
);
TSKEY
ts
=
tsCols
[
offset
];
bool
hasTimeWindow
=
false
;
STimeWindow
win
=
getActiveTimeWindow
(
pWindowResInfo
,
ts
,
pQuery
);
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
win
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
win
,
masterScan
,
&
hasTimeWindow
)
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
sasArray
);
return
;
}
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
int32_t
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
pQuery
->
pos
,
ekey
,
searchFn
,
true
);
if
(
hasTimeWindow
)
{
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
int32_t
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
pQuery
->
pos
,
ekey
,
searchFn
,
true
);
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
win
,
pQuery
->
pos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
win
,
pQuery
->
pos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
}
int32_t
index
=
pWindowResInfo
->
curIndex
;
STimeWindow
nextWin
=
win
;
...
...
@@ -926,14 +943,19 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
// null data, failed to allocate more memory buffer
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
nextWin
)
!=
TSDB_CODE_SUCCESS
)
{
bool
hasTimeWindow
=
false
;
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
nextWin
,
masterScan
,
&
hasTimeWindow
)
!=
TSDB_CODE_SUCCESS
)
{
break
;
}
ekey
=
reviseWindowEkey
(
pQuery
,
&
nextWin
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
startPos
,
ekey
,
searchFn
,
true
);
if
(
!
hasTimeWindow
)
{
continue
;
}
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
nextWin
);
int32_t
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
startPos
,
ekey
,
searchFn
,
true
);
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
nextWin
,
startPos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
}
...
...
@@ -983,7 +1005,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
}
// assert(pRuntimeEnv->windowResInfo.hashList->size <= 2);
SWindowResult
*
pWindowRes
=
doSetTimeWindowFromKey
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
pData
,
bytes
);
SWindowResult
*
pWindowRes
=
doSetTimeWindowFromKey
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
pData
,
bytes
,
true
);
if
(
pWindowRes
==
NULL
)
{
return
-
1
;
}
...
...
@@ -1113,6 +1135,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
static
void
rowwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
SWindowResInfo
*
pWindowResInfo
,
SArray
*
pDataBlock
)
{
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
bool
masterScan
=
IS_MASTER_SCAN
(
pRuntimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
item
=
pQuery
->
current
;
...
...
@@ -1179,16 +1202,21 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// interval window query
if
(
isIntervalQuery
(
pQuery
))
{
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
// decide the time window according to the primary timestamp
int64_t
ts
=
tsCols
[
offset
];
STimeWindow
win
=
getActiveTimeWindow
(
pWindowResInfo
,
ts
,
pQuery
);
int32_t
ret
=
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
win
);
bool
hasTimeWindow
=
false
;
int32_t
ret
=
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
win
,
masterScan
,
&
hasTimeWindow
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
continue
;
}
if
(
!
hasTimeWindow
)
{
continue
;
}
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doRowwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
win
,
offset
);
...
...
@@ -1208,12 +1236,15 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// null data, failed to allocate more memory buffer
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
nextWin
)
!=
TSDB_CODE_SUCCESS
)
{
bool
hasTimeWindow
=
false
;
if
(
setWindowOutputBufByKey
(
pRuntimeEnv
,
pWindowResInfo
,
pDataBlockInfo
->
tid
,
&
nextWin
,
masterScan
,
&
hasTimeWindow
)
!=
TSDB_CODE_SUCCESS
)
{
break
;
}
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doRowwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
nextWin
,
offset
);
if
(
hasTimeWindow
)
{
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
doRowwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
nextWin
,
offset
);
}
}
pWindowResInfo
->
curIndex
=
index
;
...
...
@@ -1283,7 +1314,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied
int32_t
numOfRes
=
0
;
if
(
isIntervalQuery
(
pQuery
))
{
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
numOfRes
=
doCheckQueryCompleted
(
pRuntimeEnv
,
lastKey
,
pWindowResInfo
);
}
else
{
numOfRes
=
getNumOfResult
(
pRuntimeEnv
);
...
...
@@ -1869,7 +1900,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
num
=
128
;
}
else
if
(
isIntervalQuery
(
pQuery
))
{
// time window query, allocate one page for each table
}
else
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
// time window query, allocate one page for each table
size_t
s
=
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
;
num
=
MAX
(
s
,
INITIAL_RESULT_ROWS_VALUE
);
}
else
{
// for super table query, one page for each subset
...
...
@@ -2019,7 +2050,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
r
|=
aAggs
[
functionId
].
dataReqFunc
(
&
pRuntimeEnv
->
pCtx
[
i
],
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
colId
);
}
if
(
pRuntimeEnv
->
pTSBuf
>
0
||
isIntervalQuery
(
pQuery
))
{
if
(
pRuntimeEnv
->
pTSBuf
>
0
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
r
|=
BLK_DATA_ALL_NEEDED
;
}
}
...
...
@@ -2173,6 +2204,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
if
(
tmp
==
NULL
)
{
// todo handle the oom
assert
(
0
);
}
else
{
memset
(
tmp
+
sizeof
(
tFilePage
)
+
bytes
*
pRec
->
rows
,
0
,
(
newSize
-
pRec
->
rows
)
*
bytes
);
pQuery
->
sdata
[
i
]
=
(
tFilePage
*
)
tmp
;
}
...
...
@@ -2203,13 +2235,15 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery
->
order
.
order
);
TsdbQueryHandleT
pQueryHandle
=
IS_MASTER_SCAN
(
pRuntimeEnv
)
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
summary
->
totalBlocks
+=
1
;
if
(
isQueryKilled
(
GET_QINFO_ADDR
(
pRuntimeEnv
)))
{
return
0
;
}
SDataBlockInfo
blockInfo
=
tsdbRetrieveDataBlockInfo
(
pQueryHandle
);
tsdbRetrieveDataBlockInfo
(
pQueryHandle
,
&
blockInfo
);
// todo extract methods
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
pRuntimeEnv
->
windowResInfo
.
prevSKey
==
TSKEY_INITIAL_VAL
)
{
...
...
@@ -2914,7 +2948,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
// group by normal columns and interval query on normal table
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
pRuntimeEnv
->
groupbyNormalCol
||
isIntervalQuery
(
pQuery
))
{
if
(
pRuntimeEnv
->
groupbyNormalCol
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
disableFuncInReverseScanImpl
(
pQInfo
,
pWindowResInfo
,
order
);
}
else
{
// for simple result of table query,
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
// todo refactor
...
...
@@ -3089,7 +3123,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
bool
toContinue
=
false
;
if
(
pRuntimeEnv
->
groupbyNormalCol
||
isIntervalQuery
(
pQuery
))
{
if
(
pRuntimeEnv
->
groupbyNormalCol
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
// for each group result, call the finalize function for each column
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
...
...
@@ -3281,7 +3315,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
void
finalizeQueryResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
pRuntimeEnv
->
groupbyNormalCol
||
isIntervalQuery
(
pQuery
))
{
if
(
pRuntimeEnv
->
groupbyNormalCol
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
// for each group result, call the finalize function for each column
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
pRuntimeEnv
->
groupbyNormalCol
)
{
...
...
@@ -3327,9 +3361,9 @@ static bool hasMainOutput(SQuery *pQuery) {
}
static
STableQueryInfo
*
createTableQueryInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
void
*
pTable
,
STimeWindow
win
,
void
*
buf
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
buf
;
//calloc(1, sizeof(STableQueryInfo));
STableQueryInfo
*
pTableQueryInfo
=
buf
;
pTableQueryInfo
->
win
=
win
;
pTableQueryInfo
->
lastKey
=
win
.
skey
;
...
...
@@ -3337,16 +3371,14 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
pTableQueryInfo
->
pTable
=
pTable
;
pTableQueryInfo
->
cur
.
vgroupIndex
=
-
1
;
int32_t
initialSize
=
1
;
int32_t
initialThreshold
=
1
;
// set more initial size of interval/groupby query
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
pRuntimeEnv
->
groupbyNormalCol
)
{
initialSize
=
20
;
initialThreshold
=
100
;
int32_t
initialSize
=
20
;
int32_t
initialThreshold
=
100
;
initWindowResInfo
(
&
pTableQueryInfo
->
windowResInfo
,
pRuntimeEnv
,
initialSize
,
initialThreshold
,
TSDB_DATA_TYPE_INT
);
}
else
{
// in other aggregate query, do not initialize the windowResInfo
}
initWindowResInfo
(
&
pTableQueryInfo
->
windowResInfo
,
pRuntimeEnv
,
initialSize
,
initialThreshold
,
TSDB_DATA_TYPE_INT
);
return
pTableQueryInfo
;
}
...
...
@@ -3388,7 +3420,8 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
}
int32_t
GROUPRESULTID
=
1
;
SWindowResult
*
pWindowRes
=
doSetTimeWindowFromKey
(
pRuntimeEnv
,
pWindowResInfo
,
(
char
*
)
&
groupIndex
,
sizeof
(
groupIndex
));
SWindowResult
*
pWindowRes
=
doSetTimeWindowFromKey
(
pRuntimeEnv
,
pWindowResInfo
,
(
char
*
)
&
groupIndex
,
sizeof
(
groupIndex
),
true
);
if
(
pWindowRes
==
NULL
)
{
return
;
}
...
...
@@ -3570,7 +3603,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
int32_t
totalSubset
=
0
;
if
(
pQInfo
->
runtimeEnv
.
groupbyNormalCol
||
(
isIntervalQuery
(
pQuery
)))
{
if
(
pQInfo
->
runtimeEnv
.
groupbyNormalCol
||
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)))
{
totalSubset
=
numOfClosedTimeWindow
(
&
pQInfo
->
runtimeEnv
.
windowResInfo
);
}
else
{
totalSubset
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
...
...
@@ -3735,7 +3768,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
}
else
{
// there are results waiting for returned to client.
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
)
&&
(
pRuntimeEnv
->
groupbyNormalCol
||
isIntervalQuery
(
pQuery
))
&&
(
pRuntimeEnv
->
groupbyNormalCol
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
&&
(
pRuntimeEnv
->
windowResInfo
.
size
>
0
))
{
return
true
;
}
...
...
@@ -3918,12 +3951,13 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
TsdbQueryHandleT
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
if
(
isQueryKilled
(
GET_QINFO_ADDR
(
pRuntimeEnv
)))
{
return
;
}
SDataBlockInfo
blockInfo
=
tsdbRetrieveDataBlockInfo
(
pQueryHandle
);
tsdbRetrieveDataBlockInfo
(
pQueryHandle
,
&
blockInfo
);
if
(
pQuery
->
limit
.
offset
>
blockInfo
.
rows
)
{
pQuery
->
limit
.
offset
-=
blockInfo
.
rows
;
...
...
@@ -3960,8 +3994,9 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pRuntimeEnv
->
pQueryHandle
))
{
SDataBlockInfo
blockInfo
=
tsdbRetrieveDataBlockInfo
(
pRuntimeEnv
->
pQueryHandle
);
tsdbRetrieveDataBlockInfo
(
pRuntimeEnv
->
pQueryHandle
,
&
blockInfo
);
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
if
(
pWindowResInfo
->
prevSKey
==
TSKEY_INITIAL_VAL
)
{
...
...
@@ -4067,7 +4102,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
return
;
}
if
(
isSTableQuery
&&
(
!
isIntervalQuery
(
pQuery
))
&&
(
!
isFixedOutputQuery
(
pQuery
)))
{
if
(
isSTableQuery
&&
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
&&
(
!
isFixedOutputQuery
(
pQuery
)))
{
return
;
}
...
...
@@ -4081,7 +4116,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
if
(
!
isSTableQuery
&&
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
)
&&
(
cond
.
order
==
TSDB_ORDER_ASC
)
&&
(
!
isIntervalQuery
(
pQuery
))
&&
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
&&
(
!
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
&&
(
!
isFixedOutputQuery
(
pQuery
))
)
{
...
...
@@ -4174,7 +4209,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
initWindowResInfo
(
&
pRuntimeEnv
->
windowResInfo
,
pRuntimeEnv
,
512
,
4096
,
type
);
}
}
else
if
(
pRuntimeEnv
->
groupbyNormalCol
||
isIntervalQuery
(
pQuery
))
{
}
else
if
(
pRuntimeEnv
->
groupbyNormalCol
||
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
int32_t
rows
=
getInitialPageNum
(
pQInfo
);
code
=
createDiskbasedResultBuffer
(
&
pRuntimeEnv
->
pResultBuf
,
rows
,
pQuery
->
rowSize
,
pQInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4225,13 +4260,15 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
int64_t
st
=
taosGetTimestampMs
();
TsdbQueryHandleT
pQueryHandle
=
IS_MASTER_SCAN
(
pRuntimeEnv
)
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
summary
->
totalBlocks
+=
1
;
if
(
isQueryKilled
(
pQInfo
))
{
break
;
}
SDataBlockInfo
blockInfo
=
tsdbRetrieveDataBlockInfo
(
pQueryHandle
);
tsdbRetrieveDataBlockInfo
(
pQueryHandle
,
&
blockInfo
);
STableQueryInfo
**
pTableQueryInfo
=
(
STableQueryInfo
**
)
taosHashGet
(
pQInfo
->
tableqinfoGroupInfo
.
map
,
&
blockInfo
.
tid
,
sizeof
(
blockInfo
.
tid
));
if
(
pTableQueryInfo
==
NULL
)
{
break
;
...
...
@@ -4244,7 +4281,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SArray
*
pDataBlock
=
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
);
if
(
!
pRuntimeEnv
->
groupbyNormalCol
)
{
if
(
!
isIntervalQuery
(
pQuery
))
{
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
int32_t
step
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
1
:-
1
;
setExecutionContext
(
pQInfo
,
(
*
pTableQueryInfo
)
->
groupIndex
,
blockInfo
.
window
.
ekey
+
step
);
}
else
{
// interval query
...
...
@@ -4655,7 +4692,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
if
(
isIntervalQuery
(
pQuery
))
{
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
size_t
numOfGroup
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfGroup
;
++
i
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
i
);
...
...
@@ -4681,7 +4718,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
* if the groupIndex > 0, the query process must be completed yet, we only need to
* copy the data into output buffer
*/
if
(
isIntervalQuery
(
pQuery
))
{
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
copyResToQueryResultBuf
(
pQInfo
,
pQuery
);
#ifdef _DEBUG_VIEW
displayInterResult
(
pQuery
->
sdata
,
pRuntimeEnv
,
pQuery
->
sdata
[
0
]
->
num
);
...
...
@@ -4890,7 +4927,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
while
(
1
)
{
tableIntervalProcessImpl
(
pRuntimeEnv
,
newStartKey
);
if
(
isIntervalQuery
(
pQuery
))
{
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
pQInfo
->
groupIndex
=
0
;
// always start from 0
pQuery
->
rec
.
rows
=
0
;
copyFromWindowResToSData
(
pQInfo
,
pRuntimeEnv
->
windowResInfo
.
pResult
);
...
...
src/query/src/qUtil.c
浏览文件 @
7fc487b8
...
...
@@ -190,8 +190,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
}
// get the result order
int32_t
resultOrder
=
(
pWindowResInfo
->
pResult
[
0
].
window
.
skey
<
pWindowResInfo
->
pResult
[
1
].
window
.
skey
)
?
TSDB_ORDER_ASC:
TSDB_ORDER_DESC
;
int32_t
resultOrder
=
(
pWindowResInfo
->
pResult
[
0
].
window
.
skey
<
pWindowResInfo
->
pResult
[
1
].
window
.
skey
)
?
1
:-
1
;
if
(
order
!=
resultOrder
)
{
return
;
...
...
src/query/src/qast.c
浏览文件 @
7fc487b8
...
...
@@ -13,25 +13,26 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tulog.h"
#include "tutil.h"
#include "tbuffer.h"
#include "tname.h"
#include "qast.h"
#include "tcompare.h"
#include "tsdb.h"
#include "exception.h"
#include "qsqlparser.h"
#include "qsyntaxtreefunction.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstoken.h"
#include "ttokendef.h"
#include "tschemautil.h"
#include "tarray.h"
#include "tskiplist.h"
#include "queryLog.h"
#include "tsdbMain.h"
#include "exception.h"
#include "tulog.h"
#include "tutil.h"
/*
*
...
...
@@ -327,104 +328,6 @@ static tExprNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *st
}
}
void
tSQLBinaryExprFromString
(
tExprNode
**
pExpr
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
char
*
src
,
int32_t
len
)
{
*
pExpr
=
NULL
;
if
(
len
<=
0
||
src
==
NULL
||
pSchema
==
NULL
||
numOfCols
<=
0
)
{
return
;
}
int32_t
pos
=
0
;
*
pExpr
=
createSyntaxTree
(
pSchema
,
numOfCols
,
src
,
&
pos
);
if
(
*
pExpr
!=
NULL
)
{
assert
((
*
pExpr
)
->
nodeType
==
TSQL_NODE_EXPR
);
}
}
int32_t
tSQLBinaryExprToStringImpl
(
tExprNode
*
pNode
,
char
*
dst
,
uint8_t
type
)
{
int32_t
len
=
0
;
if
(
type
==
TSQL_NODE_EXPR
)
{
*
dst
=
'('
;
tSQLBinaryExprToString
(
pNode
,
dst
+
1
,
&
len
);
len
+=
2
;
*
(
dst
+
len
-
1
)
=
')'
;
}
else
if
(
type
==
TSQL_NODE_COL
)
{
len
=
sprintf
(
dst
,
"%s"
,
pNode
->
pSchema
->
name
);
}
else
{
len
=
tVariantToString
(
pNode
->
pVal
,
dst
);
}
return
len
;
}
// TODO REFACTOR WITH SQL PARSER
static
char
*
tSQLOptrToString
(
uint8_t
optr
,
char
*
dst
)
{
switch
(
optr
)
{
case
TSDB_RELATION_LESS
:
{
*
dst
=
'<'
;
dst
+=
1
;
break
;
}
case
TSDB_RELATION_LESS_EQUAL
:
{
*
dst
=
'<'
;
*
(
dst
+
1
)
=
'='
;
dst
+=
2
;
break
;
}
case
TSDB_RELATION_EQUAL
:
{
*
dst
=
'='
;
dst
+=
1
;
break
;
}
case
TSDB_RELATION_GREATER
:
{
*
dst
=
'>'
;
dst
+=
1
;
break
;
}
case
TSDB_RELATION_GREATER_EQUAL
:
{
*
dst
=
'>'
;
*
(
dst
+
1
)
=
'='
;
dst
+=
2
;
break
;
}
case
TSDB_RELATION_NOT_EQUAL
:
{
*
dst
=
'<'
;
*
(
dst
+
1
)
=
'>'
;
dst
+=
2
;
break
;
}
case
TSDB_RELATION_OR
:
{
memcpy
(
dst
,
"or"
,
2
);
dst
+=
2
;
break
;
}
case
TSDB_RELATION_AND
:
{
memcpy
(
dst
,
"and"
,
3
);
dst
+=
3
;
break
;
}
default:
;
}
return
dst
;
}
void
tSQLBinaryExprToString
(
tExprNode
*
pExpr
,
char
*
dst
,
int32_t
*
len
)
{
if
(
pExpr
==
NULL
)
{
*
dst
=
0
;
*
len
=
0
;
return
;
}
int32_t
lhs
=
tSQLBinaryExprToStringImpl
(
pExpr
->
_node
.
pLeft
,
dst
,
pExpr
->
_node
.
pLeft
->
nodeType
);
dst
+=
lhs
;
*
len
=
lhs
;
char
*
start
=
tSQLOptrToString
(
pExpr
->
_node
.
optr
,
dst
);
*
len
+=
(
start
-
dst
);
*
len
+=
tSQLBinaryExprToStringImpl
(
pExpr
->
_node
.
pRight
,
start
,
pExpr
->
_node
.
pRight
->
nodeType
);
}
static
void
UNUSED_FUNC
destroySyntaxTree
(
tExprNode
*
pNode
)
{
tExprNodeDestroy
(
pNode
,
NULL
);
}
void
tExprNodeDestroy
(
tExprNode
*
pNode
,
void
(
*
fp
)(
void
*
))
{
...
...
@@ -773,8 +676,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
SSkipListNode
*
pNode
=
tSkipListIterGet
(
iter
);
char
*
pData
=
SL_GET_NODE_DATA
(
pNode
);
// todo refactor:
tstr
*
name
=
(
*
(
STable
**
)
pData
)
->
name
;
tstr
*
name
=
(
tstr
*
)
tsdbGetTableName
(
*
(
void
**
)
pData
);
// todo speed up by using hash
if
(
pQueryInfo
->
colIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
if
(
pQueryInfo
->
optr
==
TSDB_RELATION_IN
)
{
...
...
@@ -976,27 +878,27 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
free
(
pRightOutput
);
}
void
tSQLBinaryExprTrv
(
tExprNode
*
pExprs
,
SArray
*
res
)
{
if
(
pExprs
==
NULL
)
{
return
;
}
tExprNode
*
pLeft
=
pExprs
->
_node
.
pLeft
;
tExprNode
*
pRight
=
pExprs
->
_node
.
pRight
;
// recursive traverse left child branch
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprTrv
(
pLeft
,
res
);
}
else
if
(
pLeft
->
nodeType
==
TSQL_NODE_COL
)
{
taosArrayPush
(
res
,
&
pLeft
->
pSchema
->
colId
);
}
if
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprTrv
(
pRight
,
res
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_COL
)
{
taosArrayPush
(
res
,
&
pRight
->
pSchema
->
colId
);
}
}
//
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) {
//
if (pExprs == NULL) {
//
return;
//
}
//
//
tExprNode *pLeft = pExprs->_node.pLeft;
//
tExprNode *pRight = pExprs->_node.pRight;
//
//
// recursive traverse left child branch
//
if (pLeft->nodeType == TSQL_NODE_EXPR) {
//
tSQLBinaryExprTrv(pLeft, res);
//
} else if (pLeft->nodeType == TSQL_NODE_COL) {
//
taosArrayPush(res, &pLeft->pSchema->colId);
//
}
//
//
if (pRight->nodeType == TSQL_NODE_EXPR) {
//
tSQLBinaryExprTrv(pRight, res);
//
} else if (pRight->nodeType == TSQL_NODE_COL) {
//
taosArrayPush(res, &pRight->pSchema->colId);
//
}
//
}
static
void
exprTreeToBinaryImpl
(
SBufferWriter
*
bw
,
tExprNode
*
expr
)
{
tbufWriteUint8
(
bw
,
expr
->
nodeType
);
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
7fc487b8
...
...
@@ -192,11 +192,6 @@ char *tsdbGetTableName(void* pTable) {
}
}
STableId
tsdbGetTableId
(
void
*
pTable
)
{
assert
(
pTable
);
return
((
STable
*
)
pTable
)
->
tableId
;
}
STableCfg
*
tsdbCreateTableCfgFromMsg
(
SMDCreateTableMsg
*
pMsg
)
{
if
(
pMsg
==
NULL
)
return
NULL
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
7fc487b8
...
...
@@ -22,7 +22,7 @@
#include "exception.h"
#include "../../../query/inc/qast.h" // todo move to common module
#include "
../../../query/inc/tlosertree.h" // todo move to util module
#include "
tlosertree.h"
#include "tsdb.h"
#include "tsdbMain.h"
...
...
@@ -122,7 +122,7 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static
void
doMergeTwoLevelData
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
SCompBlock
*
pBlock
,
SArray
*
sa
);
static
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
static
int
tsdbReadRowsFromCache
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
maxKey
,
int
maxRowsToRead
,
TSKEY
*
skey
,
TSKEY
*
ekey
,
static
int
tsdbReadRowsFromCache
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
maxKey
,
int
maxRowsToRead
,
STimeWindow
*
win
,
STsdbQueryHandle
*
pQueryHandle
);
static
void
tsdbInitDataBlockLoadInfo
(
SDataBlockLoadInfo
*
pBlockLoadInfo
)
{
...
...
@@ -249,8 +249,6 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
pCheckInfo
->
initBuf
=
true
;
int32_t
order
=
pHandle
->
order
;
// tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);
// no data in buffer, abort
if
(
pHandle
->
mem
==
NULL
&&
pHandle
->
imem
==
NULL
)
{
return
false
;
...
...
@@ -392,8 +390,11 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
STable
*
pTable
=
pCheckInfo
->
pTableObj
;
assert
(
pTable
!=
NULL
);
initTableMemIterator
(
pHandle
,
pCheckInfo
);
if
(
!
pCheckInfo
->
initBuf
)
{
initTableMemIterator
(
pHandle
,
pCheckInfo
);
}
SDataRow
row
=
getSDataRowInTableMem
(
pCheckInfo
);
if
(
row
==
NULL
)
{
return
false
;
...
...
@@ -411,8 +412,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
int32_t
step
=
ASCENDING_TRAVERSE
(
pHandle
->
order
)
?
1
:-
1
;
STimeWindow
*
win
=
&
pHandle
->
cur
.
win
;
pHandle
->
cur
.
rows
=
tsdbReadRowsFromCache
(
pCheckInfo
,
pHandle
->
window
.
ekey
,
pHandle
->
outputCapacity
,
&
win
->
skey
,
&
win
->
ekey
,
pHandle
);
// todo refactor API
pHandle
->
cur
.
rows
=
tsdbReadRowsFromCache
(
pCheckInfo
,
pHandle
->
window
.
ekey
,
pHandle
->
outputCapacity
,
win
,
pHandle
);
// update the last key value
pCheckInfo
->
lastKey
=
win
->
ekey
+
step
;
...
...
@@ -576,6 +576,8 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
static
bool
doLoadFileDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SCompBlock
*
pBlock
,
STableCheckInfo
*
pCheckInfo
)
{
STsdbRepo
*
pRepo
=
pQueryHandle
->
pTsdb
;
// TODO refactor
SCompData
*
data
=
calloc
(
1
,
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pBlock
->
numOfCols
);
data
->
numOfCols
=
pBlock
->
numOfCols
;
...
...
@@ -608,8 +610,9 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
}
SDataCols
*
pCols
=
pQueryHandle
->
rhelper
.
pDataCols
[
0
];
assert
(
pCols
->
numOfRows
!=
0
);
assert
(
pCols
->
numOfRows
!=
0
&&
pCols
->
numOfRows
<=
pBlock
->
numOfRows
);
pBlock
->
numOfRows
=
pCols
->
numOfRows
;
taosArrayDestroy
(
sa
);
tfree
(
data
);
...
...
@@ -639,7 +642,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
int32_t
step
=
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
?
1
:
-
1
;
cur
->
rows
=
tsdbReadRowsFromCache
(
pCheckInfo
,
binfo
.
window
.
skey
-
step
,
pQueryHandle
->
outputCapacity
,
&
cur
->
win
.
skey
,
&
cur
->
win
.
ekey
,
pQueryHandle
);
pQueryHandle
->
outputCapacity
,
&
cur
->
win
,
pQueryHandle
);
pQueryHandle
->
realNumOfRows
=
cur
->
rows
;
// update the last key value
...
...
@@ -1240,7 +1243,6 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void*
// assert(pLeftBlockInfoEx->compBlock->offset != pRightBlockInfoEx->compBlock->offset);
if
(
pLeftBlockInfoEx
->
compBlock
->
offset
==
pRightBlockInfoEx
->
compBlock
->
offset
&&
pLeftBlockInfoEx
->
compBlock
->
last
==
pRightBlockInfoEx
->
compBlock
->
last
)
{
// todo add more information
tsdbError
(
"error in header file, two block with same offset:%"
PRId64
,
(
int64_t
)
pLeftBlockInfoEx
->
compBlock
->
offset
);
}
...
...
@@ -1480,7 +1482,8 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
size_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
assert
(
numOfTables
>
0
);
SDataBlockInfo
blockInfo
=
{{
0
},
0
};
if
(
pQueryHandle
->
type
==
TSDB_QUERY_TYPE_EXTERNAL
)
{
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_ALL
;
pQueryHandle
->
order
=
TSDB_ORDER_DESC
;
...
...
@@ -1490,7 +1493,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
}
SArray
*
sa
=
getDefaultLoadColumns
(
pQueryHandle
,
true
);
/*SDataBlockInfo* pBlockInfo =*/
tsdbRetrieveDataBlockInfo
(
pHandle
);
/*SDataBlockInfo* pBlockInfo =*/
tsdbRetrieveDataBlockInfo
(
pHandle
,
&
blockInfo
);
/*SArray *pDataBlock = */
tsdbRetrieveDataBlock
(
pHandle
,
sa
);
if
(
pQueryHandle
->
cur
.
win
.
ekey
==
pQueryHandle
->
window
.
skey
)
{
...
...
@@ -1561,7 +1564,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
bool
ret
=
tsdbNextDataBlock
((
void
*
)
pSecQueryHandle
);
assert
(
ret
);
/*SDataBlockInfo* pBlockInfo =*/
tsdbRetrieveDataBlockInfo
((
void
*
)
pSecQueryHandle
);
/*SDataBlockInfo* pBlockInfo =*/
tsdbRetrieveDataBlockInfo
((
void
*
)
pSecQueryHandle
,
&
blockInfo
);
/*SArray *pDataBlock = */
tsdbRetrieveDataBlock
((
void
*
)
pSecQueryHandle
,
sa
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
...
...
@@ -1696,11 +1699,11 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
pQueryHandle
->
window
=
(
STimeWindow
)
{
info
.
lastKey
,
TSKEY_INITIAL_VAL
};
}
static
int
tsdbReadRowsFromCache
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
maxKey
,
int
maxRowsToRead
,
TSKEY
*
skey
,
TSKEY
*
ekey
,
static
int
tsdbReadRowsFromCache
(
STableCheckInfo
*
pCheckInfo
,
TSKEY
maxKey
,
int
maxRowsToRead
,
STimeWindow
*
win
,
STsdbQueryHandle
*
pQueryHandle
)
{
int
numOfRows
=
0
;
int32_t
numOfCols
=
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
*
skey
=
TSKEY_INITIAL_VAL
;
win
->
skey
=
TSKEY_INITIAL_VAL
;
int64_t
st
=
taosGetTimestampUs
();
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
pQueryHandle
->
pTsdb
);
...
...
@@ -1720,11 +1723,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
break
;
}
if
(
*
skey
==
INT64_MIN
)
{
*
skey
=
key
;
if
(
win
->
skey
==
INT64_MIN
)
{
win
->
skey
=
key
;
}
*
ekey
=
key
;
win
->
ekey
=
key
;
copyOneRowFromMem
(
pQueryHandle
,
maxRowsToRead
,
numOfRows
,
row
,
pMeta
,
numOfCols
,
pTable
);
if
(
++
numOfRows
>=
maxRowsToRead
)
{
...
...
@@ -1753,7 +1756,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
return
numOfRows
;
}
SDataBlockInfo
tsdbRetrieveDataBlockInfo
(
TsdbQueryHandleT
*
pQueryHandle
)
{
void
tsdbRetrieveDataBlockInfo
(
TsdbQueryHandleT
*
pQueryHandle
,
SDataBlockInfo
*
pDataBlockInfo
)
{
STsdbQueryHandle
*
pHandle
=
(
STsdbQueryHandle
*
)
pQueryHandle
;
SQueryFilePos
*
cur
=
&
pHandle
->
cur
;
STable
*
pTable
=
NULL
;
...
...
@@ -1766,16 +1769,12 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pHandle
->
pTableCheckInfo
,
pHandle
->
activeIndex
);
pTable
=
pCheckInfo
->
pTableObj
;
}
SDataBlockInfo
blockInfo
=
{
.
uid
=
pTable
->
tableId
.
uid
,
.
tid
=
pTable
->
tableId
.
tid
,
.
rows
=
cur
->
rows
,
.
window
=
cur
->
win
,
.
numOfCols
=
QH_GET_NUM_OF_COLS
(
pHandle
),
};
return
blockInfo
;
pDataBlockInfo
->
uid
=
pTable
->
tableId
.
uid
;
pDataBlockInfo
->
tid
=
pTable
->
tableId
.
tid
;
pDataBlockInfo
->
rows
=
cur
->
rows
;
pDataBlockInfo
->
window
=
cur
->
win
;
pDataBlockInfo
->
numOfCols
=
QH_GET_NUM_OF_COLS
(
pHandle
);
}
/*
...
...
@@ -1977,9 +1976,9 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
int32_t
type
=
0
;
int32_t
bytes
=
0
;
if
(
colIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
// todo refactor extract method , to queryExecutor to generate tags values
f1
=
(
char
*
)
pTable1
->
name
;
f2
=
(
char
*
)
pTable2
->
name
;
if
(
colIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
f1
=
(
char
*
)
TABLE_NAME
(
pTable1
)
;
f2
=
(
char
*
)
TABLE_NAME
(
pTable2
)
;
type
=
TSDB_DATA_TYPE_BINARY
;
bytes
=
tGetTableNameColumnSchema
().
bytes
;
}
else
{
...
...
@@ -2088,13 +2087,17 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
char
*
val
=
NULL
;
if
(
pInfo
->
colIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
val
=
(
char
*
)
pTable
->
name
;
val
=
(
char
*
)
TABLE_NAME
(
pTable
)
;
}
else
{
val
=
tdGetKVRowValOfCol
(
pTable
->
tagVal
,
pInfo
->
sch
.
colId
);
}
//todo :the val is possible to be null, so check it out carefully
int32_t
ret
=
pInfo
->
compare
(
val
,
pInfo
->
q
);
int32_t
ret
=
0
;
if
(
val
==
NULL
)
{
//the val is possible to be null, so check it out carefully
ret
=
-
1
;
// val is missing in table tags value pairs
}
else
{
ret
=
pInfo
->
compare
(
val
,
pInfo
->
q
);
}
switch
(
pInfo
->
optr
)
{
case
TSDB_RELATION_EQUAL
:
{
...
...
src/
query
/inc/tlosertree.h
→
src/
util
/inc/tlosertree.h
浏览文件 @
7fc487b8
文件已移动
src/
query
/src/tlosertree.c
→
src/
util
/src/tlosertree.c
浏览文件 @
7fc487b8
...
...
@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tlosertree.h"
#include "os.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "queryLog.h"
#include "tulog.h"
// set initial value for loser tree
void
tLoserTreeInit
(
SLoserTreeInfo
*
pTree
)
{
...
...
@@ -45,7 +45,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa
*
pTree
=
(
SLoserTreeInfo
*
)
calloc
(
1
,
sizeof
(
SLoserTreeInfo
)
+
sizeof
(
SLoserTreeNode
)
*
totalEntries
);
if
((
*
pTree
)
==
NULL
)
{
q
Error
(
"allocate memory for loser-tree failed. reason:%s"
,
strerror
(
errno
));
u
Error
(
"allocate memory for loser-tree failed. reason:%s"
,
strerror
(
errno
));
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
7fc487b8
...
...
@@ -94,10 +94,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
qinfo_t
pQInfo
=
NULL
;
void
**
handle
=
NULL
;
if
(
contLen
!=
0
)
{
qinfo_t
pQInfo
=
NULL
;
code
=
qCreateQueryInfo
(
pVnode
->
tsdb
,
pVnode
->
vgId
,
pQueryTableMsg
,
pVnode
,
NULL
,
&
pQInfo
);
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
...
...
@@ -116,19 +116,19 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
qKillQuery
(
pQInfo
);
qKillQuery
(
pQInfo
);
pQInfo
=
NULL
;
}
else
{
assert
(
*
handle
==
pQInfo
);
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
(
handle
));
}
pQInfo
=
NULL
;
if
(
handle
!=
NULL
&&
vnodeNotifyCurrentQhandle
(
pReadMsg
->
rpcMsg
.
handle
,
handle
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%p, query discarded since link is broken, %p"
,
pVnode
->
vgId
,
pQInfo
,
pReadMsg
->
rpcMsg
.
handle
);
vError
(
"vgId:%d, QInfo:%p, query discarded since link is broken, %p"
,
pVnode
->
vgId
,
*
handle
,
pReadMsg
->
rpcMsg
.
handle
);
pRsp
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
// NOTE: there two refcount, needs to kill twice
// query has not been put into qhandle pool, kill it directly.
qKillQuery
(
pQInfo
);
qKillQuery
(
*
handle
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
return
pRsp
->
code
;
}
...
...
@@ -139,16 +139,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
vgId
,
pQInfo
);
}
else
{
assert
(
pCont
!=
NULL
);
pQInfo
=
*
(
void
**
)(
pCont
);
handle
=
pCont
;
code
=
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg in progress"
,
pVnode
->
vgId
,
pQInfo
);
handle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
pCont
);
if
(
handle
==
NULL
)
{
vWarn
(
"QInfo:%p invalid qhandle in continuing exec query, conn:%p"
,
*
(
void
**
)
pCont
,
pReadMsg
->
rpcMsg
.
handle
);
code
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
else
{
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg in progress"
,
pVnode
->
vgId
,
*
(
void
**
)
pCont
);
code
=
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
}
}
if
(
pQInfo
!=
NULL
)
{
qTableQuery
(
pQInfo
);
// do execute query
assert
(
handle
!=
NULL
);
if
(
handle
!=
NULL
)
{
qTableQuery
(
*
handle
);
// do execute query
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
false
);
}
...
...
@@ -160,57 +162,64 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRspRet
*
pRet
=
&
pReadMsg
->
rspRet
;
SRetrieveTableMsg
*
pRetrieve
=
pCont
;
void
**
pQInfo
=
(
void
*
)
htobe64
(
pRetrieve
->
qhandle
);
pRetrieve
->
qhandle
=
htobe64
(
pRetrieve
->
qhandle
);
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is disposed"
,
pVnode
->
vgId
,
*
pQInfo
);
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg is disposed"
,
pVnode
->
vgId
,
*
(
void
**
)
pRetrieve
->
qhandle
);
memset
(
pRet
,
0
,
sizeof
(
SRspRet
));
int32_t
ret
=
0
;
void
**
handle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
pQInfo
);
if
(
handle
==
NULL
||
handle
!=
pQInfo
)
{
ret
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
void
**
handle
=
qAcquireQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
pRetrieve
->
qhandle
);
if
(
handle
==
NULL
||
handle
!=
(
void
**
)
pRetrieve
->
qhandle
)
{
code
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
vDebug
(
"vgId:%d, invalid qhandle in fetch result, QInfo:%p"
,
pVnode
->
vgId
,
*
(
void
**
)
pRetrieve
->
qhandle
);
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
SRetrieveTableRsp
*
pRsp
=
pRet
->
rsp
;
pRsp
->
numOfRows
=
0
;
pRsp
->
completed
=
true
;
pRsp
->
useconds
=
0
;
return
code
;
}
if
(
pRetrieve
->
free
==
1
)
{
if
(
ret
==
TSDB_CODE_SUCCESS
)
{
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
pQInfo
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
SRetrieveTableRsp
*
pRsp
=
pRet
->
rsp
;
pRsp
->
numOfRows
=
0
;
pRsp
->
completed
=
true
;
pRsp
->
useconds
=
0
;
}
else
{
// todo handle error
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
}
return
ret
;
}
vDebug
(
"vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle"
,
pVnode
->
vgId
,
*
handle
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
int32_t
code
=
qRetrieveQueryResultInfo
(
*
pQInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
ret
!=
TSDB_CODE_SUCCESS
)
{
//TODO
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
pRet
->
len
=
sizeof
(
SRetrieveTableRsp
);
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
SRetrieveTableRsp
*
pRsp
=
pRet
->
rsp
;
pRsp
->
numOfRows
=
0
;
pRsp
->
completed
=
true
;
pRsp
->
useconds
=
0
;
}
else
{
// todo check code and handle error in build result set
code
=
qDumpRetrieveResult
(
*
pQInfo
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
);
if
(
qHasMoreResultsToRetrieve
(
*
handle
))
{
dnodePutItemIntoReadQueue
(
pVnode
,
handle
);
pRet
->
qhandle
=
handle
;
code
=
TSDB_CODE_SUCCESS
;
}
else
{
// no further execution invoked, release the ref to vnode
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
true
);
return
code
;
}
bool
freeHandle
=
true
;
code
=
qRetrieveQueryResultInfo
(
*
handle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
//TODO handle malloc failure
pRet
->
rsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRet
->
rsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
}
else
{
// if failed to dump result, free qhandle immediately
if
((
code
=
qDumpRetrieveResult
(
*
handle
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
))
==
TSDB_CODE_SUCCESS
)
{
if
(
qHasMoreResultsToRetrieve
(
*
handle
))
{
dnodePutItemIntoReadQueue
(
pVnode
,
handle
);
pRet
->
qhandle
=
handle
;
freeHandle
=
false
;
}
}
}
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
freeHandle
);
return
code
;
}
...
...
tests/script/general/parser/join.sim
浏览文件 @
7fc487b8
...
...
@@ -447,4 +447,15 @@ sql insert into um2 using m2 tags(9) values(1000001, 10)(2000000, 20);
sql_error select count(*) from m1,m2 where m1.a=m2.a and m1.ts=m2.ts;
#empty table join test, add for no result join test
sql create database ux1;
sql use ux1;
sql create table m1(ts timestamp, k int) tags(a binary(12), b int);
sql create table tm0 using m1 tags('abc', 1);
sql create table m2(ts timestamp, k int) tags(a int, b binary(12));
sql create table tm2 using m2 tags(2, 'abc');
sql select count(*) from tm0, tm2 where tm0.ts=tm2.ts;
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a
sql drop database ux1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录