Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
3612ba04
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
3612ba04
编写于
6月 13, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
6月 13, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2265 from taosdata/feature/query
Feature/query
上级
705e8178
fe2bd893
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
272 addition
and
128 deletion
+272
-128
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+6
-7
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+24
-3
src/client/src/tscServer.c
src/client/src/tscServer.c
+2
-2
src/inc/query.h
src/inc/query.h
+2
-1
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+6
-3
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+6
-4
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+171
-67
src/query/src/qUtil.c
src/query/src/qUtil.c
+8
-4
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+13
-3
tests/script/general/parser/groupby.sim
tests/script/general/parser/groupby.sim
+1
-1
tests/script/general/parser/import_commit3.sim
tests/script/general/parser/import_commit3.sim
+1
-1
tests/script/general/parser/testSuite.sim
tests/script/general/parser/testSuite.sim
+32
-32
未找到文件。
src/client/src/tscFunctionImpl.c
浏览文件 @
3612ba04
...
...
@@ -699,7 +699,7 @@ static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end,
}
static
int32_t
last_data_req_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64Key
)
{
return
BLK_DATA_NO_NEEDED
;
}
...
...
@@ -727,7 +727,7 @@ static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY
}
static
int32_t
last_dist_data_req_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64Key
)
{
return
BLK_DATA_NO_NEEDED
;
}
...
...
@@ -1593,7 +1593,7 @@ static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) {
if
(
pCtx
->
hasNull
&&
isNull
(
pData
,
pCtx
->
inputType
))
{
return
;
}
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
)
{
return
;
}
...
...
@@ -1652,7 +1652,7 @@ static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) {
* least one data in this block that is not null.(TODO opt for this case)
*/
static
void
last_function
(
SQLFunctionCtx
*
pCtx
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64Key
)
{
return
;
}
...
...
@@ -1681,7 +1681,6 @@ static void last_function(SQLFunctionCtx *pCtx) {
}
static
void
last_function_f
(
SQLFunctionCtx
*
pCtx
,
int32_t
index
)
{
assert
(
pCtx
->
order
!=
TSDB_ORDER_ASC
);
void
*
pData
=
GET_INPUT_CHAR_INDEX
(
pCtx
,
index
);
if
(
pCtx
->
hasNull
&&
isNull
(
pData
,
pCtx
->
inputType
))
{
return
;
...
...
@@ -1725,7 +1724,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
* 1. for scan data in asc order, no need to check data
* 2. for data blocks that are not loaded, no need to check data
*/
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64Key
)
{
return
;
}
...
...
@@ -1763,7 +1762,7 @@ static void last_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) {
* 1. for scan data in asc order, no need to check data
* 2. for data blocks that are not loaded, no need to check data
*/
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
if
(
pCtx
->
order
!=
pCtx
->
param
[
0
].
i64Key
)
{
return
;
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
3612ba04
...
...
@@ -1452,6 +1452,13 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
SSqlExpr
*
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
functionID
,
pColIndex
,
type
,
bytes
,
bytes
,
false
);
tstrncpy
(
pExpr
->
aliasName
,
columnName
,
sizeof
(
pExpr
->
aliasName
));
// set reverse order scan data blocks for last query
if
(
functionID
==
TSDB_FUNC_LAST
)
{
pExpr
->
numOfParams
=
1
;
pExpr
->
param
[
0
].
i64Key
=
TSDB_ORDER_DESC
;
pExpr
->
param
[
0
].
nType
=
TSDB_DATA_TYPE_INT
;
}
// for all queries, the timestamp column needs to be loaded
SColumnIndex
index
=
{.
tableIndex
=
pColIndex
->
tableIndex
,
.
columnIndex
=
PRIMARYKEY_TIMESTAMP_COL_INDEX
};
...
...
@@ -1724,6 +1731,22 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
if
(
setExprInfoForFunctions
(
pQueryInfo
,
pSchema
,
functionID
,
pItem
->
aliasName
,
colIndex
+
i
,
&
index
)
!=
0
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
if
(
optr
==
TK_LAST
)
{
// todo refactor
SSqlGroupbyExpr
*
pGroupBy
=
&
pQueryInfo
->
groupbyExpr
;
if
(
pGroupBy
->
numOfGroupCols
>
0
)
{
for
(
int32_t
k
=
0
;
k
<
pGroupBy
->
numOfGroupCols
;
++
k
)
{
SColIndex
*
pIndex
=
taosArrayGet
(
pGroupBy
->
columnInfo
,
k
);
if
(
!
TSDB_COL_IS_TAG
(
pIndex
->
flag
)
&&
pIndex
->
colIndex
<
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
))
{
// group by normal columns
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
colIndex
+
i
);
pExpr
->
numOfParams
=
1
;
pExpr
->
param
->
i64Key
=
TSDB_ORDER_ASC
;
break
;
}
}
}
}
}
}
...
...
@@ -2586,9 +2609,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
tscColumnListInsert
(
pQueryInfo
->
colList
,
&
index
);
SColIndex
colIndex
=
{
.
colIndex
=
index
.
columnIndex
,
.
flag
=
TSDB_COL_NORMAL
,
.
colId
=
pSchema
->
colId
,
};
SColIndex
colIndex
=
{
.
colIndex
=
index
.
columnIndex
,
.
flag
=
TSDB_COL_NORMAL
,
.
colId
=
pSchema
->
colId
};
taosArrayPush
(
pGroupExpr
->
columnInfo
,
&
colIndex
);
pQueryInfo
->
groupbyExpr
.
orderType
=
TSDB_ORDER_ASC
;
...
...
src/client/src/tscServer.c
浏览文件 @
3612ba04
...
...
@@ -430,7 +430,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
/*
* 1. if the subqueries are not launched or partially launched, we need to waiting the launched
* query return to successfully free allocated resources.
* 2. if no any subqueries are launched yet, which means the
metric
query only in parse sql stage,
* 2. if no any subqueries are launched yet, which means the
super table
query only in parse sql stage,
* set the res.code, and return.
*/
const
int64_t
MAX_WAITING_TIME
=
10000
;
// 10 Sec.
...
...
@@ -2201,7 +2201,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
* The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
* instead.
*/
tscTrace
(
"%p force release
meter
meta after drop table:%s"
,
pSql
,
pTableMetaInfo
->
name
);
tscTrace
(
"%p force release
table
meta after drop table:%s"
,
pSql
,
pTableMetaInfo
->
name
);
taosCacheRelease
(
tscCacheHandle
,
(
void
**
)
&
pTableMeta
,
true
);
if
(
pTableMetaInfo
->
pTableMeta
)
{
...
...
src/inc/query.h
浏览文件 @
3612ba04
...
...
@@ -70,7 +70,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo);
int32_t
qDumpRetrieveResult
(
qinfo_t
qinfo
,
SRetrieveTableRsp
**
pRsp
,
int32_t
*
contLen
);
/**
* Decide if more results will be produced or not
* Decide if more results will be produced or not, NOTE: this function will increase the ref count of QInfo,
* so it can be only called once for each retrieve
*
* @param qinfo
* @return
...
...
src/mnode/src/mnodeDb.c
浏览文件 @
3612ba04
...
...
@@ -84,9 +84,12 @@ static int32_t mnodeDbActionDelete(SSdbOper *pOper) {
mnodeDropAllChildTables
(
pDb
);
mnodeDropAllSuperTables
(
pDb
);
mnodeDropAllDbVgroups
(
pDb
);
mnodeDropDbFromAcct
(
pAcct
,
pDb
);
mnodeDecAcctRef
(
pAcct
);
if
(
pAcct
)
{
mnodeDropDbFromAcct
(
pAcct
,
pDb
);
mnodeDecAcctRef
(
pAcct
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
3612ba04
...
...
@@ -157,10 +157,12 @@ static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) {
if
(
pDb
!=
NULL
)
pAcct
=
mnodeGetAcct
(
pDb
->
acct
);
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
grantRestore
(
TSDB_GRANT_TIMESERIES
,
pTable
->
superTable
->
numOfColumns
-
1
);
if
(
pAcct
!=
NULL
)
pAcct
->
acctInfo
.
numOfTimeSeries
-=
(
pTable
->
superTable
->
numOfColumns
-
1
);
mnodeRemoveTableFromStable
(
pTable
->
superTable
,
pTable
);
mnodeDecTableRef
(
pTable
->
superTable
);
if
(
pTable
->
superTable
)
{
grantRestore
(
TSDB_GRANT_TIMESERIES
,
pTable
->
superTable
->
numOfColumns
-
1
);
if
(
pAcct
!=
NULL
)
pAcct
->
acctInfo
.
numOfTimeSeries
-=
(
pTable
->
superTable
->
numOfColumns
-
1
);
mnodeRemoveTableFromStable
(
pTable
->
superTable
,
pTable
);
mnodeDecTableRef
(
pTable
->
superTable
);
}
}
else
{
grantRestore
(
TSDB_GRANT_TIMESERIES
,
pTable
->
numOfColumns
-
1
);
if
(
pAcct
!=
NULL
)
pAcct
->
acctInfo
.
numOfTimeSeries
-=
(
pTable
->
numOfColumns
-
1
);
...
...
src/query/src/qExecutor.c
浏览文件 @
3612ba04
...
...
@@ -354,7 +354,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
int16_t
bytes
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
*
p1
=
(
int32_t
*
)
taosHashGet
(
pWindowResInfo
->
hashList
,
pData
,
bytes
);
int32_t
*
p1
=
(
int32_t
*
)
taosHashGet
(
pWindowResInfo
->
hashList
,
pData
,
bytes
);
if
(
p1
!=
NULL
)
{
pWindowResInfo
->
curIndex
=
*
p1
;
}
else
{
// more than the capacity, reallocate the resources
...
...
@@ -919,12 +919,25 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
SDiskbasedResultBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
int64_t
v
=
-
1
;
// not assign result buffer yet, add new result buffer
switch
(
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_TINYINT
:
v
=
GET_INT8_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
v
=
GET_INT16_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_INT
:
v
=
GET_INT32_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
v
=
GET_INT64_VAL
(
pData
);
break
;
}
// assert(pRuntimeEnv->windowResInfo.hashList->size <= 2);
SWindowResult
*
pWindowRes
=
doSetTimeWindowFromKey
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
pData
,
bytes
);
if
(
pWindowRes
==
NULL
)
{
return
-
1
;
}
// not assign result buffer yet, add new result buffer
pWindowRes
->
window
.
skey
=
v
;
pWindowRes
->
window
.
ekey
=
v
;
if
(
pWindowRes
->
pos
.
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
GROUPRESULTID
,
pRuntimeEnv
->
numOfRowsPerPage
);
if
(
ret
!=
0
)
{
...
...
@@ -1022,12 +1035,16 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return
false
;
}
if
(
functionId
==
TSDB_FUNC_LAST_DST
||
functionId
==
TSDB_FUNC_LAST
)
{
return
!
QUERY_IS_ASC_QUERY
(
pQuery
);
}
else
if
(
functionId
==
TSDB_FUNC_FIRST_DST
||
functionId
==
TSDB_FUNC_FIRST
)
{
if
(
functionId
==
TSDB_FUNC_FIRST_DST
||
functionId
==
TSDB_FUNC_FIRST
)
{
return
QUERY_IS_ASC_QUERY
(
pQuery
);
}
// todo add comments
if
((
functionId
==
TSDB_FUNC_LAST_DST
||
functionId
==
TSDB_FUNC_LAST
))
{
return
pCtx
->
param
[
0
].
i64Key
==
pQuery
->
order
.
order
;
// return !QUERY_IS_ASC_QUERY(pQuery);
}
// in the supplementary scan, only the following functions need to be executed
if
(
IS_REVERSE_SCAN
(
pRuntimeEnv
))
{
return
false
;
...
...
@@ -1079,7 +1096,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
int32_t
j
=
0
;
int32_t
offset
=
-
1
;
for
(
j
=
0
;
j
<
pDataBlockInfo
->
rows
;
++
j
)
{
offset
=
GET_COL_DATA_POS
(
pQuery
,
j
,
step
);
...
...
@@ -1479,19 +1496,6 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
static
bool
isQueryKilled
(
SQInfo
*
pQInfo
)
{
return
(
pQInfo
->
code
==
TSDB_CODE_TSC_QUERY_CANCELLED
);
#if 0
/*
* check if the queried meter is going to be deleted.
* if it will be deleted soon, stop current query ASAP.
*/
SMeterObj *pMeterObj = pQInfo->pObj;
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DROPPING)) {
pQInfo->killed = 1;
return true;
}
return (pQInfo->killed == 1);
#endif
}
static
void
setQueryKilled
(
SQInfo
*
pQInfo
)
{
pQInfo
->
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
}
...
...
@@ -1574,10 +1578,14 @@ static bool needReverseScan(SQuery *pQuery) {
continue
;
}
if
(((
functionId
==
TSDB_FUNC_LAST
||
functionId
==
TSDB_FUNC_LAST_DST
)
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
((
functionId
==
TSDB_FUNC_FIRST
||
functionId
==
TSDB_FUNC_FIRST_DST
)
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
if
((
functionId
==
TSDB_FUNC_FIRST
||
functionId
==
TSDB_FUNC_FIRST_DST
)
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
return
true
;
}
if
(
functionId
==
TSDB_FUNC_LAST
||
functionId
==
TSDB_FUNC_LAST_DST
)
{
int32_t
order
=
pQuery
->
pSelectExpr
[
i
].
base
.
arg
->
argValue
.
i64
;
return
order
!=
pQuery
->
order
.
order
;
}
}
return
false
;
...
...
@@ -2030,6 +2038,34 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
return
midPos
;
}
static
void
ensureOutputBufferSimple
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
capacity
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
capacity
<
pQuery
->
rec
.
capacity
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
bytes
=
pQuery
->
pSelectExpr
[
i
].
bytes
;
assert
(
bytes
>
0
&&
capacity
>
0
);
char
*
tmp
=
realloc
(
pQuery
->
sdata
[
i
],
bytes
*
capacity
+
sizeof
(
tFilePage
));
if
(
tmp
==
NULL
)
{
// todo handle the oom
assert
(
0
);
}
else
{
pQuery
->
sdata
[
i
]
=
(
tFilePage
*
)
tmp
;
}
// set the pCtx output buffer position
pRuntimeEnv
->
pCtx
[
i
].
aOutputBuf
=
pQuery
->
sdata
[
i
]
->
data
;
}
qTrace
(
"QInfo:%p realloc output buffer to inc output buffer from: %d rows to:%d rows"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pQuery
->
rec
.
capacity
,
capacity
);
pQuery
->
rec
.
capacity
=
capacity
;
}
static
void
ensureOutputBuffer
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pBlockInfo
)
{
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -2916,8 +2952,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv
->
pCtx
[
i
].
ptsOutputBuf
=
pRuntimeEnv
->
pCtx
[
0
].
aOutputBuf
;
}
}
updateNumOfResult
(
pRuntimeEnv
,
pQuery
->
rec
.
rows
);
}
}
...
...
@@ -3054,7 +3089,7 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus
pQuery
->
window
=
pTableQueryInfo
->
win
;
}
void
scan
All
DataBlocks
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
TSKEY
start
)
{
void
scan
OneTable
DataBlocks
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
TSKEY
start
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
...
...
@@ -3496,18 +3531,32 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
assert
(
pQuery
->
rec
.
rows
<=
pQuery
->
rec
.
capacity
);
}
static
void
updateWindowResNumOfRes
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STableQueryInfo
*
pTableQueryInfo
)
{
static
UNUSED_FUNC
void
updateWindowResNumOfRes
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STableQueryInfo
*
pTableQueryInfo
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// update the number of result for each, only update the number of rows for the corresponding window result.
if
(
pQuery
->
intervalTime
==
0
)
{
int32_t
g
=
pTableQueryInfo
->
groupIndex
;
assert
(
pRuntimeEnv
->
windowResInfo
.
size
>
0
);
SWindowResult
*
pWindowRes
=
doSetTimeWindowFromKey
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
(
char
*
)
&
g
,
sizeof
(
g
));
if
(
pWindowRes
->
numOfRows
==
0
)
{
pWindowRes
->
numOfRows
=
getNumOfResult
(
pRuntimeEnv
);
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
windowResInfo
.
size
;
++
i
)
{
SWindowResult
*
pResult
=
&
pRuntimeEnv
->
windowResInfo
.
pResult
[
i
];
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pRuntimeEnv
->
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TS
||
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_TAGPRJ
)
{
continue
;
}
pResult
->
numOfRows
=
MAX
(
pResult
->
numOfRows
,
pResult
->
resultInfo
[
j
].
numOfRes
);
}
}
// int32_t g = pTableQueryInfo->groupIndex;
// assert(pRuntimeEnv->windowResInfo.size > 0);
//
// SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
// if (pWindowRes->numOfRows == 0) {
// pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv);
// }
}
}
...
...
@@ -3519,7 +3568,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *
SWindowResInfo
*
pWindowResInfo
=
&
pTableQueryInfo
->
windowResInfo
;
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
pDataBlockInfo
->
rows
-
1
;
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTSBuf
!=
NULL
||
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
)
{
rowwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pWindowResInfo
,
pDataBlock
);
}
else
{
blockwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pWindowResInfo
,
searchFn
,
pDataBlock
);
...
...
@@ -4081,21 +4130,22 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SDataStatis
*
pStatis
=
NULL
;
SArray
*
pDataBlock
=
loadDataBlockOnDemand
(
pRuntimeEnv
,
pQueryHandle
,
&
blockInfo
,
&
pStatis
);
if
(
!
isIntervalQuery
(
pQuery
))
{
int32_t
step
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
1
:-
1
;
setExecutionContext
(
pQInfo
,
&
pTableQueryInfo
->
id
,
pTableQueryInfo
->
groupIndex
,
blockInfo
.
window
.
ekey
+
step
);
}
else
{
// interval query
TSKEY
nextKey
=
blockInfo
.
window
.
skey
;
setIntervalQueryRange
(
pQInfo
,
nextKey
);
/*int32_t ret = */
setAdditionalInfo
(
pQInfo
,
&
pTableQueryInfo
->
id
,
pTableQueryInfo
);
if
(
!
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
if
(
!
isIntervalQuery
(
pQuery
))
{
int32_t
step
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
1
:-
1
;
setExecutionContext
(
pQInfo
,
&
pTableQueryInfo
->
id
,
pTableQueryInfo
->
groupIndex
,
blockInfo
.
window
.
ekey
+
step
);
}
else
{
// interval query
TSKEY
nextKey
=
blockInfo
.
window
.
skey
;
setIntervalQueryRange
(
pQInfo
,
nextKey
);
/*int32_t ret = */
setAdditionalInfo
(
pQInfo
,
&
pTableQueryInfo
->
id
,
pTableQueryInfo
);
}
}
summary
->
totalRows
+=
blockInfo
.
rows
;
stableApplyFunctionsOnBlock
(
pRuntimeEnv
,
&
blockInfo
,
pStatis
,
pDataBlock
,
binarySearchForKey
);
qTrace
(
"QInfo:%p check data block, uid:%"
PRId64
", tid:%d, brange:%"
PRId64
"-%"
PRId64
", numOfRows:%d, lastKey:%"
PRId64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
blockInfo
.
uid
,
blockInfo
.
tid
,
blockInfo
.
window
.
skey
,
blockInfo
.
window
.
ekey
,
blockInfo
.
rows
,
pQuery
->
current
->
lastKey
);
pQInfo
,
blockInfo
.
uid
,
blockInfo
.
tid
,
blockInfo
.
window
.
skey
,
blockInfo
.
window
.
ekey
,
blockInfo
.
rows
,
pQuery
->
current
->
lastKey
);
}
int64_t
et
=
taosGetTimestampMs
();
...
...
@@ -4220,7 +4270,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// here we simply set the first table as current table
pQuery
->
current
=
((
SGroupItem
*
)
taosArrayGet
(
group
,
0
))
->
info
;
scan
All
DataBlocks
(
pRuntimeEnv
,
pQuery
->
current
->
lastKey
);
scan
OneTable
DataBlocks
(
pRuntimeEnv
,
pQuery
->
current
->
lastKey
);
int64_t
numOfRes
=
getNumOfResult
(
pRuntimeEnv
);
if
(
numOfRes
>
0
)
{
...
...
@@ -4233,10 +4283,84 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// enable execution for next table, when handling the projection query
enableExecutionForNextTable
(
pRuntimeEnv
);
if
(
pQuery
->
rec
.
rows
>=
pQuery
->
rec
.
capacity
)
{
setQueryStatus
(
pQuery
,
QUERY_RESBUF_FULL
);
break
;
}
}
}
else
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
// group-by on normal columns query
while
(
pQInfo
->
groupIndex
<
numOfGroups
)
{
SArray
*
group
=
taosArrayGetP
(
pQInfo
->
groupInfo
.
pGroupList
,
pQInfo
->
groupIndex
);
qTrace
(
"QInfo:%p group by normal columns group:%d, total group:%d"
,
pQInfo
,
pQInfo
->
groupIndex
,
numOfGroups
);
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
colList
=
pQuery
->
colList
,
.
order
=
pQuery
->
order
.
order
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
SArray
*
g1
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
tx
=
taosArrayClone
(
group
);
taosArrayPush
(
g1
,
&
tx
);
STableGroupInfo
gp
=
{.
numOfTables
=
taosArrayGetSize
(
tx
),
.
pGroupList
=
g1
};
// include only current table
if
(
pRuntimeEnv
->
pQueryHandle
!=
NULL
)
{
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pQueryHandle
);
pRuntimeEnv
->
pQueryHandle
=
NULL
;
}
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
,
pQInfo
);
SArray
*
s
=
tsdbGetQueriedTableIdList
(
pRuntimeEnv
->
pQueryHandle
);
assert
(
taosArrayGetSize
(
s
)
>=
1
);
setTagVal
(
pRuntimeEnv
,
(
STableId
*
)
taosArrayGet
(
s
,
0
),
pQInfo
->
tsdb
);
// here we simply set the first table as current table
scanMultiTableDataBlocks
(
pQInfo
);
pQInfo
->
groupIndex
+=
1
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
// no results generated for current group, continue to try the next group
if
(
pWindowResInfo
->
size
<=
0
)
{
continue
;
}
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
&
pWindowResInfo
->
pResult
[
i
].
status
;
pStatus
->
closed
=
true
;
// enable return all results for group by normal columns
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
pResult
->
numOfRows
=
MAX
(
pResult
->
numOfRows
,
pResult
->
resultInfo
[
j
].
numOfRes
);
}
}
qTrace
(
"QInfo:%p generated groupby columns results %d rows for group %d completed"
,
pQInfo
,
pWindowResInfo
->
size
,
pQInfo
->
groupIndex
);
int32_t
currentGroupIndex
=
pQInfo
->
groupIndex
;
pQuery
->
rec
.
rows
=
0
;
pQInfo
->
groupIndex
=
0
;
ensureOutputBufferSimple
(
pRuntimeEnv
,
pWindowResInfo
->
size
);
copyFromWindowResToSData
(
pQInfo
,
pWindowResInfo
->
pResult
);
pQInfo
->
groupIndex
=
currentGroupIndex
;
//restore the group index
assert
(
pQuery
->
rec
.
rows
==
pWindowResInfo
->
size
);
clearClosedTimeWindow
(
pRuntimeEnv
);
break
;
}
}
else
{
/*
* 1. super table projection query, 2.
group-by on normal columns query, 3.
ts-comp query
* 1. super table projection query, 2. ts-comp query
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place.
*/
...
...
@@ -4283,7 +4407,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
}
scan
All
DataBlocks
(
pRuntimeEnv
,
pQuery
->
current
->
lastKey
);
scan
OneTable
DataBlocks
(
pRuntimeEnv
,
pQuery
->
current
->
lastKey
);
skipResults
(
pRuntimeEnv
);
// the limitation of output result is reached, set the query completed
...
...
@@ -4349,25 +4473,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pRuntimeEnv
->
cur
=
pRuntimeEnv
->
pTSBuf
->
cur
;
}
// todo refactor
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
&
pWindowResInfo
->
pResult
[
i
].
status
;
pStatus
->
closed
=
true
;
// enable return all results for group by normal columns
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
pResult
->
numOfRows
=
MAX
(
pResult
->
numOfRows
,
pResult
->
resultInfo
[
j
].
numOfRes
);
}
}
pQInfo
->
groupIndex
=
0
;
pQuery
->
rec
.
rows
=
0
;
copyFromWindowResToSData
(
pQInfo
,
pWindowResInfo
->
pResult
);
}
qTrace
(
"QInfo %p numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%"
PRId64
", offset:%"
PRId64
,
pQInfo
,
pQInfo
->
groupInfo
.
numOfTables
,
pQInfo
->
tableIndex
,
numOfGroups
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
,
...
...
@@ -4449,7 +4554,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
*/
if
(
isIntervalQuery
(
pQuery
))
{
copyResToQueryResultBuf
(
pQInfo
,
pQuery
);
#ifdef _DEBUG_VIEW
displayInterResult
(
pQuery
->
sdata
,
pRuntimeEnv
,
pQuery
->
sdata
[
0
]
->
num
);
#endif
...
...
@@ -4527,7 +4631,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
pQuery
->
current
=
pTableInfo
;
// set current query table info
scan
All
DataBlocks
(
pRuntimeEnv
,
pTableInfo
->
lastKey
);
scan
OneTable
DataBlocks
(
pRuntimeEnv
,
pTableInfo
->
lastKey
);
finalizeQueryResult
(
pRuntimeEnv
);
if
(
isQueryKilled
(
pQInfo
))
{
...
...
@@ -4560,7 +4664,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
}
while
(
1
)
{
scan
All
DataBlocks
(
pRuntimeEnv
,
pQuery
->
current
->
lastKey
);
scan
OneTable
DataBlocks
(
pRuntimeEnv
,
pQuery
->
current
->
lastKey
);
finalizeQueryResult
(
pRuntimeEnv
);
if
(
isQueryKilled
(
pQInfo
))
{
...
...
@@ -4607,7 +4711,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start)
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
while
(
1
)
{
scan
All
DataBlocks
(
pRuntimeEnv
,
start
);
scan
OneTable
DataBlocks
(
pRuntimeEnv
,
start
);
if
(
isQueryKilled
(
GET_QINFO_ADDR
(
pRuntimeEnv
)))
{
return
;
...
...
src/query/src/qUtil.c
浏览文件 @
3612ba04
...
...
@@ -113,7 +113,9 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
if
(
pResult
->
status
.
closed
)
{
// remove the window slot from hash table
taosHashRemove
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
);
taosHashRemove
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
pWindowResInfo
->
type
);
printf
(
"remove ============>%ld, remain size:%ld
\n
"
,
pResult
->
window
.
skey
,
pWindowResInfo
->
hashList
->
size
);
}
else
{
break
;
}
...
...
@@ -133,14 +135,16 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
}
pWindowResInfo
->
size
=
remain
;
printf
(
"---------------size:%ld
\n
"
,
taosHashGetSize
(
pWindowResInfo
->
hashList
));
for
(
int32_t
k
=
0
;
k
<
pWindowResInfo
->
size
;
++
k
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
k
];
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
);
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
tDataTypeDesc
[
pWindowResInfo
->
type
].
nSize
);
int32_t
v
=
(
*
p
-
num
);
assert
(
v
>=
0
&&
v
<=
pWindowResInfo
->
size
);
taosHashPut
(
pWindowResInfo
->
hashList
,
(
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
,
(
char
*
)
&
v
,
sizeof
(
int32_t
));
taosHashPut
(
pWindowResInfo
->
hashList
,
(
char
*
)
&
pResult
->
window
.
skey
,
tDataTypeDesc
[
pWindowResInfo
->
type
].
nSize
,
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
pWindowResInfo
->
curIndex
=
-
1
;
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
3612ba04
...
...
@@ -61,7 +61,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
static
void
vnodeNotifyCurrentQhandle
(
void
*
handle
,
void
*
qhandle
,
int32_t
vgId
)
{
static
int32_t
vnodeNotifyCurrentQhandle
(
void
*
handle
,
void
*
qhandle
,
int32_t
vgId
)
{
SRetrieveTableMsg
*
killQueryMsg
=
rpcMallocCont
(
sizeof
(
SRetrieveTableMsg
));
killQueryMsg
->
qhandle
=
htobe64
((
uint64_t
)
qhandle
);
killQueryMsg
->
free
=
htons
(
1
);
...
...
@@ -69,7 +69,7 @@ static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId)
killQueryMsg
->
header
.
contLen
=
htonl
(
sizeof
(
SRetrieveTableMsg
));
vTrace
(
"QInfo:%p register qhandle to connect:%p"
,
qhandle
,
handle
);
rpcReportProgress
(
handle
,
(
char
*
)
killQueryMsg
,
sizeof
(
SRetrieveTableMsg
));
r
eturn
r
pcReportProgress
(
handle
,
(
char
*
)
killQueryMsg
,
sizeof
(
SRetrieveTableMsg
));
}
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
)
{
...
...
@@ -106,7 +106,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRet
->
len
=
sizeof
(
SQueryTableRsp
);
pRet
->
rsp
=
pRsp
;
vnodeNotifyCurrentQhandle
(
pReadMsg
->
rpcMsg
.
handle
,
pQInfo
,
pVnode
->
vgId
);
// current connect is broken
if
(
vnodeNotifyCurrentQhandle
(
pReadMsg
->
rpcMsg
.
handle
,
pQInfo
,
pVnode
->
vgId
)
!=
TSDB_CODE_SUCCESS
)
{
vError
(
"vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p"
,
pVnode
->
vgId
,
pQInfo
,
pReadMsg
->
rpcMsg
.
handle
);
pRsp
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
//NOTE: there two refcount, needs to kill twice, todo refactor
qKillQuery
(
pQInfo
);
qKillQuery
(
pQInfo
);
return
pRsp
->
code
;
}
vTrace
(
"vgId:%d, QInfo:%p, dnode query msg disposed"
,
pVnode
->
vgId
,
pQInfo
);
}
else
{
...
...
tests/script/general/parser/groupby.sim
浏览文件 @
3612ba04
...
...
@@ -355,7 +355,7 @@ if $data00 != 0 then
return -1
endi
if $data
0
1 != 800 then
if $data
1
1 != 800 then
return -1
endi
...
...
tests/script/general/parser/import_commit3.sim
浏览文件 @
3612ba04
...
...
@@ -25,7 +25,7 @@ step1:
sql create database $db cache 16
print ====== create tables
sql use $db
sql reset query cache
$i = 0
$ts = $ts0
$tb = $tbPrefix . $i
...
...
tests/script/general/parser/testSuite.sim
浏览文件 @
3612ba04
...
...
@@ -40,42 +40,42 @@
#run general/parser/nchar.sim
#sleep 2000
##run general/parser/null_char.sim
#
sleep 2000
#
run general/parser/single_row_in_tb.sim
#
sleep 2000
#
run general/parser/select_from_cache_disk.sim
#
sleep 2000
#
run general/parser/selectResNum.sim
#
sleep 2000
#
run general/parser/mixed_blocks.sim
#
sleep 2000
#
run general/parser/limit1.sim
#
sleep 2000
#
run general/parser/limit.sim
#
sleep 2000
#
run general/parser/limit1_tblocks100.sim
#
sleep 2000
#
run general/parser/select_across_vnodes.sim
#
sleep 2000
#
run general/parser/slimit1.sim
#
sleep 2000
#
run general/parser/tbnameIn.sim
#
sleep 2000
sleep 2000
run general/parser/single_row_in_tb.sim
sleep 2000
run general/parser/select_from_cache_disk.sim
sleep 2000
run general/parser/selectResNum.sim
sleep 2000
run general/parser/mixed_blocks.sim
sleep 2000
run general/parser/limit1.sim
sleep 2000
run general/parser/limit.sim
sleep 2000
run general/parser/limit1_tblocks100.sim
sleep 2000
run general/parser/select_across_vnodes.sim
sleep 2000
run general/parser/slimit1.sim
sleep 2000
run general/parser/tbnameIn.sim
sleep 2000
run general/parser/projection_limit_offset.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
#
run general/parser/fill.sim
#
sleep 2000
#
run general/parser/fill_stb.sim
#
sleep 2000
#
run general/parser/where.sim
#
sleep 2000
#
run general/parser/slimit.sim
#
sleep 2000
#
run general/parser/select_with_tags.sim
#
sleep 2000
#
run general/parser/interp.sim
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
run general/parser/where.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/interp.sim
sleep 2000
run general/parser/tags_dynamically_specifiy.sim
sleep 2000
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录