Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
36c9ba0c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
36c9ba0c
编写于
2月 03, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-2895] refactor
上级
19fb8c0d
变更
16
展开全部
隐藏空白更改
内联
并排
Showing
16 changed file
with
1334 addition
and
1213 deletion
+1334
-1213
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+1
-1
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+0
-1
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+1
-1
src/client/tests/cliTest.cpp
src/client/tests/cliTest.cpp
+9
-9
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+2
-1
src/query/inc/qAggMain.h
src/query/inc/qAggMain.h
+4
-5
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+81
-3
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+8
-0
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+15
-16
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+303
-1131
src/query/src/qUtil.c
src/query/src/qUtil.c
+248
-1
src/query/src/queryMain.c
src/query/src/queryMain.c
+536
-0
src/util/inc/tarray.h
src/util/inc/tarray.h
+1
-1
src/util/src/tarray.c
src/util/src/tarray.c
+8
-17
tests/script/general/parser/limit2_query.sim
tests/script/general/parser/limit2_query.sim
+91
-0
tests/script/general/parser/testSuite.sim
tests/script/general/parser/testSuite.sim
+26
-26
未找到文件。
src/client/src/tscAsync.c
浏览文件 @
36c9ba0c
...
...
@@ -322,7 +322,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
code
=
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
assert
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
||
code
==
TSDB_CODE_SUCCESS
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
return
;
}
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
36c9ba0c
...
...
@@ -86,7 +86,6 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
pCtx
->
outputBytes
=
pExpr
->
resBytes
;
pCtx
->
outputType
=
pExpr
->
resType
;
pCtx
->
startOffset
=
0
;
pCtx
->
size
=
1
;
pCtx
->
hasNull
=
true
;
pCtx
->
currentStage
=
MERGE_STAGE
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
36c9ba0c
...
...
@@ -4737,7 +4737,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
int32_t
parseOrderbyClause
(
SSqlCmd
*
pCmd
,
SQueryInfo
*
pQueryInfo
,
SQuerySQL
*
pQuerySql
,
SSchema
*
pSchema
)
{
const
char
*
msg0
=
"only support order by primary timestamp"
;
const
char
*
msg1
=
"invalid column name"
;
const
char
*
msg2
=
"o
nly support o
rder by primary timestamp or first tag in groupby clause allowed"
;
const
char
*
msg2
=
"order by primary timestamp or first tag in groupby clause allowed"
;
const
char
*
msg3
=
"invalid column in order by clause, only primary timestamp or first tag in groupby clause allowed"
;
setDefaultOrderInfo
(
pQueryInfo
);
...
...
src/client/tests/cliTest.cpp
浏览文件 @
36c9ba0c
...
...
@@ -57,7 +57,7 @@ void stmtInsertTest() {
v
.
ts
=
start_ts
+
20
;
v
.
k
=
123
;
char
*
str
=
"abc"
;
char
str
[]
=
"abc"
;
uintptr_t
len
=
strlen
(
str
);
v
.
a
=
str
;
...
...
@@ -65,7 +65,7 @@ void stmtInsertTest() {
params
[
2
].
buffer_length
=
len
;
params
[
2
].
buffer
=
str
;
char
*
nstr
=
"999"
;
char
nstr
[]
=
"999"
;
uintptr_t
len1
=
strlen
(
nstr
);
v
.
b
=
nstr
;
...
...
@@ -84,18 +84,18 @@ void stmtInsertTest() {
v
.
ts
=
start_ts
+
30
;
v
.
k
=
911
;
str
=
"92"
;
len
=
strlen
(
str
);
char
str1
[]
=
"92"
;
len
=
strlen
(
str
1
);
params
[
2
].
length
=
&
len
;
params
[
2
].
buffer_length
=
len
;
params
[
2
].
buffer
=
str
;
params
[
2
].
buffer
=
str
1
;
nstr
=
"1920"
;
len1
=
strlen
(
nstr
);
char
nstr1
[]
=
"1920"
;
len1
=
strlen
(
nstr
1
);
params
[
3
].
buffer_length
=
len1
;
params
[
3
].
buffer
=
nstr
;
params
[
3
].
buffer
=
nstr
1
;
params
[
3
].
length
=
&
len1
;
taos_stmt_bind_param
(
stmt
,
params
);
...
...
@@ -103,7 +103,7 @@ void stmtInsertTest() {
ret
=
taos_stmt_execute
(
stmt
);
if
(
ret
!=
0
)
{
printf
(
"%
p
\n
"
,
ret
);
printf
(
"%
d
\n
"
,
ret
);
printf
(
"
\033
[31mfailed to execute insert statement.
\033
[0m
\n
"
);
return
;
}
...
...
src/cq/src/cqMain.c
浏览文件 @
36c9ba0c
...
...
@@ -190,7 +190,7 @@ static void freeSCqContext(void *handle) {
}
SCqContext
*
pContext
=
handle
;
pthread_mutex_destroy
(
&
pContext
->
mutex
);
taosTmrCleanUp
(
pContext
->
tmrCtrl
);
pContext
->
tmrCtrl
=
NULL
;
cDebug
(
"vgId:%d, CQ is closed"
,
pContext
->
vgId
);
...
...
@@ -256,6 +256,7 @@ void cqStop(void *handle) {
if
(
tsEnableStream
==
0
)
{
return
;
}
SCqContext
*
pContext
=
handle
;
cDebug
(
"vgId:%d, stop all CQs"
,
pContext
->
vgId
);
if
(
pContext
->
dbConn
==
NULL
||
pContext
->
master
==
0
)
return
;
...
...
src/query/inc/qAggMain.h
浏览文件 @
36c9ba0c
...
...
@@ -84,7 +84,7 @@ extern "C" {
#define TSDB_FUNCSTATE_SO 0x1u // single output
#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
#define TSDB_FUNCSTATE_STREAM 0x4u // function avail for stream
#define TSDB_FUNCSTATE_STABLE 0x8u // function avail for
metric
#define TSDB_FUNCSTATE_STABLE 0x8u // function avail for
super table
#define TSDB_FUNCSTATE_OF 0x10u // outer forward
#define TSDB_FUNCSTATE_NEED_TS 0x20u // timestamp is required during query processing
#define TSDB_FUNCSTATE_SELECTIVITY 0x40u // selectivity functions, can exists along with tag columns
...
...
@@ -166,9 +166,8 @@ typedef struct SExtTagsInfo {
// sql function runtime context
typedef
struct
SQLFunctionCtx
{
int32_t
startOffset
;
// todo remove it
int32_t
size
;
// number of rows
void
*
pInput
;
//
void
*
pInput
;
//
input data buffer
uint32_t
order
;
// asc|desc
int16_t
inputType
;
int16_t
inputBytes
;
...
...
@@ -184,7 +183,7 @@ typedef struct SQLFunctionCtx {
uint8_t
currentStage
;
// record current running step, default: 0
int64_t
startTs
;
// timestamp range of current query when function is executed on a specific data block
int32_t
numOfParams
;
tVariant
param
[
4
];
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param
*/
tVariant
param
[
4
];
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t
*
ptsList
;
// corresponding timestamp array list
void
*
ptsOutputBuf
;
// corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SQLPreAggVal
preAggVals
;
...
...
@@ -228,7 +227,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
#define IS_OUTER_FORWARD(x) (((x)&TSDB_FUNCSTATE_OF) != 0)
/
* determine the real data need to calculated the result */
/
/ determine the real data need to calculated the result
enum
{
BLK_DATA_NO_NEEDED
=
0x0
,
BLK_DATA_STATIS_NEEDED
=
0x1
,
...
...
src/query/inc/qExecutor.h
浏览文件 @
36c9ba0c
...
...
@@ -33,6 +33,36 @@ struct SColumnFilterElem;
typedef
bool
(
*
__filter_func_t
)(
struct
SColumnFilterElem
*
pFilter
,
const
char
*
val1
,
const
char
*
val2
,
int16_t
type
);
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
enum
{
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED
=
0x1u
,
/* result output buffer is full, current query is paused.
* this status is only exist in group-by clause and diff/add/division/multiply/ query.
*/
QUERY_RESBUF_FULL
=
0x2u
,
/* query is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
QUERY_COMPLETED
=
0x4u
,
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
QUERY_OVER
=
0x8u
,
};
typedef
struct
SResultRowPool
{
int32_t
elemSize
;
int32_t
blockSize
;
...
...
@@ -66,7 +96,8 @@ typedef struct SResultRow {
}
SResultRow
;
typedef
struct
SGroupResInfo
{
int32_t
rowId
;
int32_t
totalGroup
;
int32_t
currentGroup
;
int32_t
index
;
SArray
*
pRows
;
// SArray<SResultRow*>
}
SGroupResInfo
;
...
...
@@ -112,7 +143,7 @@ typedef struct STableQueryInfo {
STimeWindow
win
;
STSCursor
cur
;
void
*
pTable
;
// for retrieve the page id list
SResultRowInfo
windowR
esInfo
;
SResultRowInfo
r
esInfo
;
}
STableQueryInfo
;
typedef
struct
SQueryCostInfo
{
...
...
@@ -193,7 +224,7 @@ typedef struct SQueryRuntimeEnv {
uint16_t
*
offset
;
uint16_t
scanFlag
;
// denotes reversed scan of data or not
SFillInfo
*
pFillInfo
;
SResultRowInfo
windowRes
Info
;
SResultRowInfo
resultRow
Info
;
SQueryCostInfo
summary
;
void
*
pQueryHandle
;
...
...
@@ -257,4 +288,51 @@ typedef struct SQInfo {
char
*
sql
;
// query sql string
}
SQInfo
;
typedef
struct
SQueryParam
{
char
*
sql
;
char
*
tagCond
;
char
*
tbnameCond
;
char
*
prevResult
;
SArray
*
pTableIdList
;
SSqlFuncMsg
**
pExprMsg
;
SSqlFuncMsg
**
pSecExprMsg
;
SExprInfo
*
pExprs
;
SExprInfo
*
pSecExprs
;
SColIndex
*
pGroupColIndex
;
SColumnInfo
*
pTagColumnInfo
;
SSqlGroupbyExpr
*
pGroupbyExpr
;
}
SQueryParam
;
void
freeParam
(
SQueryParam
*
param
);
int32_t
convertQueryMsg
(
SQueryTableMsg
*
pQueryMsg
,
SQueryParam
*
param
);
int32_t
createQueryFuncExprFromMsg
(
SQueryTableMsg
*
pQueryMsg
,
int32_t
numOfOutput
,
SExprInfo
**
pExprInfo
,
SSqlFuncMsg
**
pExprMsg
,
SColumnInfo
*
pTagCols
);
SSqlGroupbyExpr
*
createGroupbyExprFromMsg
(
SQueryTableMsg
*
pQueryMsg
,
SColIndex
*
pColIndex
,
int32_t
*
code
);
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SSqlGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
SExprInfo
*
pSecExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SColumnInfo
*
pTagCols
,
bool
stableQuery
,
char
*
sql
);
int32_t
initQInfo
(
SQueryTableMsg
*
pQueryMsg
,
void
*
tsdb
,
int32_t
vgId
,
SQInfo
*
pQInfo
,
SQueryParam
*
param
,
bool
isSTable
);
void
freeColumnFilterInfo
(
SColumnFilterInfo
*
pFilter
,
int32_t
numOfFilters
);
bool
isQueryKilled
(
SQInfo
*
pQInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
bool
doBuildResCheck
(
SQInfo
*
pQInfo
);
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
);
bool
onlyQueryTags
(
SQuery
*
pQuery
);
void
buildTagQueryResult
(
SQInfo
*
pQInfo
);
void
stableQueryImpl
(
SQInfo
*
pQInfo
);
void
buildTableBlockDistResult
(
SQInfo
*
pQInfo
);
void
tableQueryImpl
(
SQInfo
*
pQInfo
);
bool
isValidQInfo
(
void
*
param
);
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
);
size_t
getResultSize
(
SQInfo
*
pQInfo
,
int64_t
*
numOfRows
);
void
setQueryKilled
(
SQInfo
*
pQInfo
);
void
queryCostStatis
(
SQInfo
*
pQInfo
);
void
freeQInfo
(
SQInfo
*
pQInfo
);
int32_t
getMaximumIdleDurationSec
();
#endif // TDENGINE_QUERYEXECUTOR_H
src/query/inc/qUtil.h
浏览文件 @
36c9ba0c
...
...
@@ -85,4 +85,12 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
SArray
*
interResFromBinary
(
const
char
*
data
,
int32_t
len
);
void
freeInterResult
(
void
*
param
);
void
initGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
,
SResultRowInfo
*
pResultInfo
,
int32_t
offset
);
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
);
bool
hasRemainData
(
SGroupResInfo
*
pGroupResInfo
);
bool
incNextGroup
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
SQInfo
*
pQInfo
);
#endif // TDENGINE_QUERYUTIL_H
src/query/src/qAggMain.c
浏览文件 @
36c9ba0c
...
...
@@ -26,10 +26,12 @@
#include "qTsbuf.h"
#include "queryLog.h"
#define GET_INPUT_DATA_LIST(x) (((char *)((x)->pInput)) + ((x)->startOffset) * ((x)->inputBytes))
//#define GET_INPUT_DATA_LIST(x) (((char *)((x)->pInput)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
#define GET_TS_LIST(x) ((TSKEY*)&((x)->ptsList[(x)->startOffset]))
//#define GET_TS_LIST(x) ((TSKEY*)&((x)->ptsList[(x)->startOffset]))
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define GET_TRUE_DATA_TYPE() \
...
...
@@ -379,11 +381,7 @@ static bool function_setup(SQLFunctionCtx *pCtx) {
static
void
function_finalizer
(
SQLFunctionCtx
*
pCtx
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
if
(
pResInfo
->
hasResult
!=
DATA_SET_FLAG
)
{
if
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_BINARY
||
pCtx
->
outputType
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
);
}
else
{
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
setNull
(
pCtx
->
pOutput
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
doFinalizer
(
pCtx
);
...
...
@@ -414,10 +412,7 @@ static void count_function(SQLFunctionCtx *pCtx) {
numOfElem
+=
1
;
}
}
else
{
/*
* when counting on the primary time stamp column and no statistics data is provided,
* simple use the size value
*/
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
numOfElem
=
pCtx
->
size
;
}
}
...
...
@@ -944,9 +939,9 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
*
* The following codes of 3 lines will be removed later.
*/
if
(
index
<
0
||
index
>=
pCtx
->
size
+
pCtx
->
startOffset
)
{
index
=
0
;
}
//
if (index < 0 || index >= pCtx->size + pCtx->startOffset) {
//
index = 0;
//
}
// the index is the original position, not the relative position
key
=
pCtx
->
ptsList
[
index
];
...
...
@@ -3487,9 +3482,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
SArithmeticSupport
*
sas
=
(
SArithmeticSupport
*
)
pCtx
->
param
[
1
].
pz
;
arithmeticTreeTraverse
(
sas
->
pArithExpr
->
pExpr
,
pCtx
->
size
,
pCtx
->
pOutput
,
sas
,
pCtx
->
order
,
getArithColumnData
);
pCtx
->
pOutput
+=
pCtx
->
outputBytes
*
pCtx
->
size
;
pCtx
->
param
[
1
].
pz
=
NULL
;
}
static
void
arithmetic_function_f
(
SQLFunctionCtx
*
pCtx
,
int32_t
index
)
{
...
...
@@ -3977,6 +3970,12 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
}
else
{
assignVal
(
pCtx
->
pOutput
,
pCtx
->
start
.
ptr
,
pCtx
->
outputBytes
,
pCtx
->
inputType
);
}
}
else
if
(
type
==
TSDB_FILL_NEXT
)
{
if
(
IS_NUMERIC_TYPE
(
pCtx
->
inputType
)
||
pCtx
->
inputType
==
TSDB_DATA_TYPE_BOOL
)
{
SET_TYPED_DATA
(
pCtx
->
pOutput
,
pCtx
->
inputType
,
pCtx
->
end
.
val
);
}
else
{
assignVal
(
pCtx
->
pOutput
,
pCtx
->
end
.
ptr
,
pCtx
->
outputBytes
,
pCtx
->
inputType
);
}
}
else
if
(
type
==
TSDB_FILL_LINEAR
)
{
SPoint
point1
=
{.
key
=
pCtx
->
start
.
key
,
.
val
=
&
pCtx
->
start
.
val
};
SPoint
point2
=
{.
key
=
pCtx
->
end
.
key
,
.
val
=
&
pCtx
->
end
.
val
};
...
...
src/query/src/qExecutor.c
浏览文件 @
36c9ba0c
此差异已折叠。
点击以展开。
src/query/src/qUtil.c
浏览文件 @
36c9ba0c
...
...
@@ -20,6 +20,14 @@
#include "qExecutor.h"
#include "qUtil.h"
#include "tbuffer.h"
#include "tlosertree.h"
#include "queryLog.h"
typedef
struct
SCompSupporter
{
STableQueryInfo
**
pTableQueryInfo
;
int32_t
*
rowIndex
;
int32_t
order
;
}
SCompSupporter
;
int32_t
getOutputInterResultBufSize
(
SQuery
*
pQuery
)
{
int32_t
size
=
0
;
...
...
@@ -322,4 +330,243 @@ void freeInterResult(void* param) {
}
taosArrayDestroy
(
pResult
->
pResult
);
}
\ No newline at end of file
}
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
)
{
assert
(
pGroupResInfo
!=
NULL
);
taosArrayDestroy
(
pGroupResInfo
->
pRows
);
pGroupResInfo
->
pRows
=
NULL
;
pGroupResInfo
->
index
=
0
;
}
void
initGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
,
SResultRowInfo
*
pResultInfo
,
int32_t
offset
)
{
if
(
pGroupResInfo
->
pRows
!=
NULL
)
{
taosArrayDestroy
(
pGroupResInfo
->
pRows
);
}
pGroupResInfo
->
pRows
=
taosArrayFromList
(
pResultInfo
->
pResult
,
pResultInfo
->
size
,
POINTER_BYTES
);
pGroupResInfo
->
index
=
offset
;
assert
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
}
bool
hasRemainData
(
SGroupResInfo
*
pGroupResInfo
)
{
if
(
pGroupResInfo
->
pRows
==
NULL
)
{
return
false
;
}
return
pGroupResInfo
->
index
<
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
}
bool
incNextGroup
(
SGroupResInfo
*
pGroupResInfo
)
{
return
(
++
pGroupResInfo
->
currentGroup
)
<
pGroupResInfo
->
totalGroup
;
}
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
)
{
assert
(
pGroupResInfo
!=
NULL
);
if
(
pGroupResInfo
->
pRows
==
0
)
{
return
0
;
}
return
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
}
static
int64_t
getNumOfResultWindowRes
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pQuery
->
pExpr1
[
j
].
base
.
functionId
;
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if
(
functionId
==
TSDB_FUNC_TS
||
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_TAGPRJ
)
{
continue
;
}
SResultRowCellInfo
*
pResultInfo
=
getResultCell
(
pRuntimeEnv
,
pResultRow
,
j
);
assert
(
pResultInfo
!=
NULL
);
if
(
pResultInfo
->
numOfRes
>
0
)
{
return
pResultInfo
->
numOfRes
;
}
}
return
0
;
}
static
int32_t
tableResultComparFn
(
const
void
*
pLeft
,
const
void
*
pRight
,
void
*
param
)
{
int32_t
left
=
*
(
int32_t
*
)
pLeft
;
int32_t
right
=
*
(
int32_t
*
)
pRight
;
SCompSupporter
*
supporter
=
(
SCompSupporter
*
)
param
;
int32_t
leftPos
=
supporter
->
rowIndex
[
left
];
int32_t
rightPos
=
supporter
->
rowIndex
[
right
];
/* left source is exhausted */
if
(
leftPos
==
-
1
)
{
return
1
;
}
/* right source is exhausted*/
if
(
rightPos
==
-
1
)
{
return
-
1
;
}
STableQueryInfo
**
pList
=
supporter
->
pTableQueryInfo
;
SResultRowInfo
*
pWindowResInfo1
=
&
(
pList
[
left
]
->
resInfo
);
SResultRow
*
pWindowRes1
=
getResultRow
(
pWindowResInfo1
,
leftPos
);
TSKEY
leftTimestamp
=
pWindowRes1
->
win
.
skey
;
SResultRowInfo
*
pWindowResInfo2
=
&
(
pList
[
right
]
->
resInfo
);
SResultRow
*
pWindowRes2
=
getResultRow
(
pWindowResInfo2
,
rightPos
);
TSKEY
rightTimestamp
=
pWindowRes2
->
win
.
skey
;
if
(
leftTimestamp
==
rightTimestamp
)
{
return
0
;
}
if
(
supporter
->
order
==
TSDB_ORDER_ASC
)
{
return
(
leftTimestamp
>
rightTimestamp
)
?
1
:-
1
;
}
else
{
return
(
leftTimestamp
<
rightTimestamp
)
?
1
:-
1
;
}
}
static
int32_t
mergeIntoGroupResultImpl
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pTableList
,
void
*
qinfo
)
{
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pRuntimeEnv
->
pQuery
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
*
posList
=
NULL
;
SLoserTreeInfo
*
pTree
=
NULL
;
STableQueryInfo
**
pTableQueryInfoList
=
NULL
;
size_t
size
=
taosArrayGetSize
(
pTableList
);
if
(
pGroupResInfo
->
pRows
==
NULL
)
{
pGroupResInfo
->
pRows
=
taosArrayInit
(
100
,
POINTER_BYTES
);
}
posList
=
calloc
(
size
,
sizeof
(
int32_t
));
pTableQueryInfoList
=
malloc
(
POINTER_BYTES
*
size
);
if
(
pTableQueryInfoList
==
NULL
||
posList
==
NULL
||
pGroupResInfo
->
pRows
==
NULL
||
pGroupResInfo
->
pRows
==
NULL
)
{
qError
(
"QInfo:%p failed alloc memory"
,
qinfo
);
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_end
;
}
int32_t
numOfTables
=
0
;
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableQueryInfo
*
item
=
taosArrayGetP
(
pTableList
,
i
);
if
(
item
->
resInfo
.
size
>
0
)
{
pTableQueryInfoList
[
numOfTables
++
]
=
item
;
}
}
// there is no data in current group
// no need to merge results since only one table in each group
if
(
numOfTables
==
0
)
{
goto
_end
;
}
SCompSupporter
cs
=
{
pTableQueryInfoList
,
posList
,
pRuntimeEnv
->
pQuery
->
order
.
order
};
int32_t
ret
=
tLoserTreeCreate
(
&
pTree
,
numOfTables
,
&
cs
,
tableResultComparFn
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_end
;
}
int64_t
lastTimestamp
=
ascQuery
?
INT64_MIN
:
INT64_MAX
;
int64_t
startt
=
taosGetTimestampMs
();
while
(
1
)
{
int32_t
tableIndex
=
pTree
->
pNode
[
0
].
index
;
SResultRowInfo
*
pWindowResInfo
=
&
pTableQueryInfoList
[
tableIndex
]
->
resInfo
;
SResultRow
*
pWindowRes
=
getResultRow
(
pWindowResInfo
,
cs
.
rowIndex
[
tableIndex
]);
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
pWindowRes
);
if
(
num
<=
0
)
{
cs
.
rowIndex
[
tableIndex
]
+=
1
;
if
(
cs
.
rowIndex
[
tableIndex
]
>=
pWindowResInfo
->
size
)
{
cs
.
rowIndex
[
tableIndex
]
=
-
1
;
if
(
--
numOfTables
==
0
)
{
// all input sources are exhausted
break
;
}
}
}
else
{
assert
((
pWindowRes
->
win
.
skey
>=
lastTimestamp
&&
ascQuery
)
||
(
pWindowRes
->
win
.
skey
<=
lastTimestamp
&&
!
ascQuery
));
if
(
pWindowRes
->
win
.
skey
!=
lastTimestamp
)
{
taosArrayPush
(
pGroupResInfo
->
pRows
,
&
pWindowRes
);
pWindowRes
->
numOfRows
=
(
uint32_t
)
num
;
}
lastTimestamp
=
pWindowRes
->
win
.
skey
;
// move to the next row of current entry
if
((
++
cs
.
rowIndex
[
tableIndex
])
>=
pWindowResInfo
->
size
)
{
cs
.
rowIndex
[
tableIndex
]
=
-
1
;
// all input sources are exhausted
if
((
--
numOfTables
)
==
0
)
{
break
;
}
}
}
tLoserTreeAdjust
(
pTree
,
tableIndex
+
pTree
->
numOfEntries
);
}
int64_t
endt
=
taosGetTimestampMs
();
qDebug
(
"QInfo:%p result merge completed for group:%d, elapsed time:%"
PRId64
" ms"
,
qinfo
,
pGroupResInfo
->
currentGroup
,
endt
-
startt
);
_end:
tfree
(
pTableQueryInfoList
);
tfree
(
posList
);
tfree
(
pTree
);
return
code
;
}
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
SQInfo
*
pQInfo
)
{
int64_t
st
=
taosGetTimestampUs
();
while
(
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
pGroupResInfo
->
currentGroup
);
int32_t
ret
=
mergeIntoGroupResultImpl
(
&
pQInfo
->
runtimeEnv
,
pGroupResInfo
,
group
,
pQInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
// this group generates at least one result, return results
if
(
taosArrayGetSize
(
pGroupResInfo
->
pRows
)
>
0
)
{
break
;
}
qDebug
(
"QInfo:%p no result in group %d, continue"
,
pQInfo
,
pGroupResInfo
->
currentGroup
);
cleanupGroupResInfo
(
pGroupResInfo
);
incNextGroup
(
pGroupResInfo
);
}
if
(
pGroupResInfo
->
currentGroup
>=
pGroupResInfo
->
totalGroup
&&
!
hasRemainData
(
pGroupResInfo
))
{
SET_STABLE_QUERY_OVER
(
pQInfo
);
}
int64_t
elapsedTime
=
taosGetTimestampUs
()
-
st
;
qDebug
(
"QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%"
PRId64
"us"
,
pQInfo
,
pGroupResInfo
->
currentGroup
,
pGroupResInfo
->
totalGroup
,
elapsedTime
);
pQInfo
->
runtimeEnv
.
summary
.
firstStageMergeTime
+=
elapsedTime
;
return
TSDB_CODE_SUCCESS
;
}
src/query/src/queryMain.c
0 → 100644
浏览文件 @
36c9ba0c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "qFill.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tglobal.h"
#include "exception.h"
#include "hash.h"
#include "texpr.h"
#include "qExecutor.h"
#include "qResultbuf.h"
#include "qUtil.h"
#include "query.h"
#include "queryLog.h"
#include "tlosertree.h"
#include "ttype.h"
#include "tcompare.h"
typedef
struct
SQueryMgmt
{
pthread_mutex_t
lock
;
SCacheObj
*
qinfoPool
;
// query handle pool
int32_t
vgId
;
bool
closed
;
}
SQueryMgmt
;
static
void
queryMgmtKillQueryFn
(
void
*
handle
)
{
void
**
fp
=
(
void
**
)
handle
;
qKillQuery
(
*
fp
);
}
static
void
freeqinfoFn
(
void
*
qhandle
)
{
void
**
handle
=
qhandle
;
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
return
;
}
qKillQuery
(
*
handle
);
qDestroyQueryInfo
(
*
handle
);
}
void
freeParam
(
SQueryParam
*
param
)
{
tfree
(
param
->
sql
);
tfree
(
param
->
tagCond
);
tfree
(
param
->
tbnameCond
);
tfree
(
param
->
pTableIdList
);
tfree
(
param
->
pExprMsg
);
tfree
(
param
->
pSecExprMsg
);
tfree
(
param
->
pExprs
);
tfree
(
param
->
pSecExprs
);
tfree
(
param
->
pGroupColIndex
);
tfree
(
param
->
pTagColumnInfo
);
tfree
(
param
->
pGroupbyExpr
);
tfree
(
param
->
prevResult
);
}
int32_t
qCreateQueryInfo
(
void
*
tsdb
,
int32_t
vgId
,
SQueryTableMsg
*
pQueryMsg
,
qinfo_t
*
pQInfo
)
{
assert
(
pQueryMsg
!=
NULL
&&
tsdb
!=
NULL
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
SQueryParam
param
=
{
0
};
code
=
convertQueryMsg
(
pQueryMsg
,
&
param
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_over
;
}
if
(
pQueryMsg
->
numOfTables
<=
0
)
{
qError
(
"Invalid number of tables to query, numOfTables:%d"
,
pQueryMsg
->
numOfTables
);
code
=
TSDB_CODE_QRY_INVALID_MSG
;
goto
_over
;
}
if
(
param
.
pTableIdList
==
NULL
||
taosArrayGetSize
(
param
.
pTableIdList
)
==
0
)
{
qError
(
"qmsg:%p, SQueryTableMsg wrong format"
,
pQueryMsg
);
code
=
TSDB_CODE_QRY_INVALID_MSG
;
goto
_over
;
}
if
((
code
=
createQueryFuncExprFromMsg
(
pQueryMsg
,
pQueryMsg
->
numOfOutput
,
&
param
.
pExprs
,
param
.
pExprMsg
,
param
.
pTagColumnInfo
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_over
;
}
if
(
param
.
pSecExprMsg
!=
NULL
)
{
if
((
code
=
createQueryFuncExprFromMsg
(
pQueryMsg
,
pQueryMsg
->
secondStageOutput
,
&
param
.
pSecExprs
,
param
.
pSecExprMsg
,
param
.
pTagColumnInfo
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_over
;
}
}
param
.
pGroupbyExpr
=
createGroupbyExprFromMsg
(
pQueryMsg
,
param
.
pGroupColIndex
,
&
code
);
if
((
param
.
pGroupbyExpr
==
NULL
&&
pQueryMsg
->
numOfGroupCols
!=
0
)
||
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_over
;
}
bool
isSTableQuery
=
false
;
STableGroupInfo
tableGroupInfo
=
{
0
};
int64_t
st
=
taosGetTimestampUs
();
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryMsg
->
queryType
,
TSDB_QUERY_TYPE_TABLE_QUERY
))
{
STableIdInfo
*
id
=
taosArrayGet
(
param
.
pTableIdList
,
0
);
qDebug
(
"qmsg:%p query normal table, uid:%"
PRId64
", tid:%d"
,
pQueryMsg
,
id
->
uid
,
id
->
tid
);
if
((
code
=
tsdbGetOneTableGroup
(
tsdb
,
id
->
uid
,
pQueryMsg
->
window
.
skey
,
&
tableGroupInfo
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_over
;
}
}
else
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryMsg
->
queryType
,
TSDB_QUERY_TYPE_MULTITABLE_QUERY
|
TSDB_QUERY_TYPE_STABLE_QUERY
))
{
isSTableQuery
=
true
;
// also note there's possibility that only one table in the super table
if
(
!
TSDB_QUERY_HAS_TYPE
(
pQueryMsg
->
queryType
,
TSDB_QUERY_TYPE_MULTITABLE_QUERY
))
{
STableIdInfo
*
id
=
taosArrayGet
(
param
.
pTableIdList
,
0
);
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t
numOfGroupByCols
=
pQueryMsg
->
numOfGroupCols
;
if
(
pQueryMsg
->
numOfGroupCols
==
1
&&
!
TSDB_COL_IS_TAG
(
param
.
pGroupColIndex
->
flag
))
{
numOfGroupByCols
=
0
;
}
qDebug
(
"qmsg:%p query stable, uid:%"
PRId64
", tid:%d"
,
pQueryMsg
,
id
->
uid
,
id
->
tid
);
code
=
tsdbQuerySTableByTagCond
(
tsdb
,
id
->
uid
,
pQueryMsg
->
window
.
skey
,
param
.
tagCond
,
pQueryMsg
->
tagCondLen
,
pQueryMsg
->
tagNameRelType
,
param
.
tbnameCond
,
&
tableGroupInfo
,
param
.
pGroupColIndex
,
numOfGroupByCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"qmsg:%p failed to query stable, reason: %s"
,
pQueryMsg
,
tstrerror
(
code
));
goto
_over
;
}
}
else
{
code
=
tsdbGetTableGroupFromIdList
(
tsdb
,
param
.
pTableIdList
,
&
tableGroupInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_over
;
}
qDebug
(
"qmsg:%p query on %"
PRIzu
" tables in one group from client"
,
pQueryMsg
,
tableGroupInfo
.
numOfTables
);
}
int64_t
el
=
taosGetTimestampUs
()
-
st
;
qDebug
(
"qmsg:%p tag filter completed, numOfTables:%"
PRIzu
", elapsed time:%"
PRId64
"us"
,
pQueryMsg
,
tableGroupInfo
.
numOfTables
,
el
);
}
else
{
assert
(
0
);
}
code
=
checkForQueryBuf
(
tableGroupInfo
.
numOfTables
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// not enough query buffer, abort
goto
_over
;
}
(
*
pQInfo
)
=
createQInfoImpl
(
pQueryMsg
,
param
.
pGroupbyExpr
,
param
.
pExprs
,
param
.
pSecExprs
,
&
tableGroupInfo
,
param
.
pTagColumnInfo
,
isSTableQuery
,
param
.
sql
);
param
.
sql
=
NULL
;
param
.
pExprs
=
NULL
;
param
.
pSecExprs
=
NULL
;
param
.
pGroupbyExpr
=
NULL
;
param
.
pTagColumnInfo
=
NULL
;
if
((
*
pQInfo
)
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_over
;
}
code
=
initQInfo
(
pQueryMsg
,
tsdb
,
vgId
,
*
pQInfo
,
&
param
,
isSTableQuery
);
_over:
if
(
param
.
pGroupbyExpr
!=
NULL
)
{
taosArrayDestroy
(
param
.
pGroupbyExpr
->
columnInfo
);
}
taosArrayDestroy
(
param
.
pTableIdList
);
param
.
pTableIdList
=
NULL
;
freeParam
(
&
param
);
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfCols
;
i
++
)
{
SColumnInfo
*
column
=
pQueryMsg
->
colList
+
i
;
freeColumnFilterInfo
(
column
->
filters
,
column
->
numOfFilters
);
}
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
*
pQInfo
=
NULL
;
}
// if failed to add ref for all tables in this query, abort current query
return
code
;
}
bool
qTableQuery
(
qinfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
assert
(
pQInfo
&&
pQInfo
->
signature
==
pQInfo
);
int64_t
threadId
=
taosGetSelfPthreadId
();
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pQInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"QInfo:%p qhandle is now executed by thread:%p"
,
pQInfo
,
(
void
*
)
curOwner
);
pQInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
false
;
}
pQInfo
->
startExecTs
=
taosGetTimestampSec
();
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p it is already killed, abort"
,
pQInfo
);
return
doBuildResCheck
(
pQInfo
);
}
if
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:%p no table exists for query, abort"
,
pQInfo
);
setQueryStatus
(
pQInfo
->
runtimeEnv
.
pQuery
,
QUERY_COMPLETED
);
return
doBuildResCheck
(
pQInfo
);
}
// error occurs, record the error code and return to client
int32_t
ret
=
setjmp
(
pQInfo
->
runtimeEnv
.
env
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pQInfo
->
code
=
ret
;
qDebug
(
"QInfo:%p query abort due to error/cancel occurs, code:%s"
,
pQInfo
,
tstrerror
(
pQInfo
->
code
));
return
doBuildResCheck
(
pQInfo
);
}
qDebug
(
"QInfo:%p query task is launched"
,
pQInfo
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
if
(
onlyQueryTags
(
pQInfo
->
runtimeEnv
.
pQuery
))
{
assert
(
pQInfo
->
runtimeEnv
.
pQueryHandle
==
NULL
);
buildTagQueryResult
(
pQInfo
);
}
else
if
(
pQInfo
->
runtimeEnv
.
stableQuery
)
{
stableQueryImpl
(
pQInfo
);
}
else
if
(
pQInfo
->
runtimeEnv
.
queryBlockDist
){
buildTableBlockDistResult
(
pQInfo
);
}
else
{
tableQueryImpl
(
pQInfo
);
}
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:%p query is killed"
,
pQInfo
);
}
else
if
(
pQuery
->
rec
.
rows
==
0
)
{
qDebug
(
"QInfo:%p over, %"
PRIzu
" tables queried, %"
PRId64
" rows are returned"
,
pQInfo
,
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
,
pQuery
->
rec
.
total
);
}
else
{
qDebug
(
"QInfo:%p query paused, %"
PRId64
" rows returned, numOfTotal:%"
PRId64
" rows"
,
pQInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
);
}
return
doBuildResCheck
(
pQInfo
);
}
int32_t
qRetrieveQueryResultInfo
(
qinfo_t
qinfo
,
bool
*
buildRes
,
void
*
pRspContext
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
qError
(
"QInfo:%p invalid qhandle"
,
pQInfo
);
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
*
buildRes
=
false
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
qDebug
(
"QInfo:%p query is killed, code:0x%08x"
,
pQInfo
,
pQInfo
->
code
);
return
pQInfo
->
code
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
tsRetrieveBlockingModel
)
{
pQInfo
->
rspContext
=
pRspContext
;
tsem_wait
(
&
pQInfo
->
ready
);
*
buildRes
=
true
;
code
=
pQInfo
->
code
;
}
else
{
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
pthread_mutex_lock
(
&
pQInfo
->
lock
);
assert
(
pQInfo
->
rspContext
==
NULL
);
if
(
pQInfo
->
dataReady
==
QUERY_RESULT_READY
)
{
*
buildRes
=
true
;
qDebug
(
"QInfo:%p retrieve result info, rowsize:%d, rows:%"
PRId64
", code:%s"
,
pQInfo
,
pQuery
->
resultRowSize
,
pQuery
->
rec
.
rows
,
tstrerror
(
pQInfo
->
code
));
}
else
{
*
buildRes
=
false
;
qDebug
(
"QInfo:%p retrieve req set query return result after paused"
,
pQInfo
);
pQInfo
->
rspContext
=
pRspContext
;
assert
(
pQInfo
->
rspContext
!=
NULL
);
}
code
=
pQInfo
->
code
;
pthread_mutex_unlock
(
&
pQInfo
->
lock
);
}
return
code
;
}
int32_t
qDumpRetrieveResult
(
qinfo_t
qinfo
,
SRetrieveTableRsp
**
pRsp
,
int32_t
*
contLen
,
bool
*
continueExec
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
size_t
size
=
getResultSize
(
pQInfo
,
&
pQuery
->
rec
.
rows
);
size
+=
sizeof
(
int32_t
);
size
+=
sizeof
(
STableIdInfo
)
*
taosHashGetSize
(
pQInfo
->
arrTableIdInfo
);
*
contLen
=
(
int32_t
)(
size
+
sizeof
(
SRetrieveTableRsp
));
// current solution only avoid crash, but cannot return error code to client
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
*
contLen
);
if
(
*
pRsp
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
(
*
pRsp
)
->
numOfRows
=
htonl
((
int32_t
)
pQuery
->
rec
.
rows
);
if
(
pQInfo
->
code
==
TSDB_CODE_SUCCESS
)
{
(
*
pRsp
)
->
offset
=
htobe64
(
pQuery
->
limit
.
offset
);
(
*
pRsp
)
->
useconds
=
htobe64
(
pRuntimeEnv
->
summary
.
elapsedTime
);
}
else
{
(
*
pRsp
)
->
offset
=
0
;
(
*
pRsp
)
->
useconds
=
htobe64
(
pRuntimeEnv
->
summary
.
elapsedTime
);
}
(
*
pRsp
)
->
precision
=
htons
(
pQuery
->
precision
);
if
(
pQuery
->
rec
.
rows
>
0
&&
pQInfo
->
code
==
TSDB_CODE_SUCCESS
)
{
doDumpQueryResult
(
pQInfo
,
(
*
pRsp
)
->
data
);
}
else
{
setQueryStatus
(
pQuery
,
QUERY_OVER
);
}
pQInfo
->
rspContext
=
NULL
;
pQInfo
->
dataReady
=
QUERY_RESULT_NOT_READY
;
if
(
IS_QUERY_KILLED
(
pQInfo
)
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_OVER
))
{
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
*
continueExec
=
false
;
(
*
pRsp
)
->
completed
=
1
;
// notify no more result to client
}
else
{
*
continueExec
=
true
;
qDebug
(
"QInfo:%p has more results to retrieve"
,
pQInfo
);
}
return
pQInfo
->
code
;
}
void
*
qGetResultRetrieveMsg
(
qinfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
assert
(
pQInfo
!=
NULL
);
return
pQInfo
->
rspContext
;
}
int32_t
qKillQuery
(
qinfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
setQueryKilled
(
pQInfo
);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while
(
pQInfo
->
owner
!=
0
)
{
taosMsleep
(
100
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qQueryCompleted
(
qinfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
return
isQueryKilled
(
pQInfo
)
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_OVER
);
}
void
qDestroyQueryInfo
(
qinfo_t
qHandle
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qHandle
;
if
(
!
isValidQInfo
(
pQInfo
))
{
return
;
}
qDebug
(
"QInfo:%p query completed"
,
pQInfo
);
queryCostStatis
(
pQInfo
);
// print the query cost summary
freeQInfo
(
pQInfo
);
}
void
*
qOpenQueryMgmt
(
int32_t
vgId
)
{
const
int32_t
refreshHandleInterval
=
30
;
// every 30 seconds, refresh handle pool
char
cacheName
[
128
]
=
{
0
};
sprintf
(
cacheName
,
"qhandle_%d"
,
vgId
);
SQueryMgmt
*
pQueryMgmt
=
calloc
(
1
,
sizeof
(
SQueryMgmt
));
if
(
pQueryMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
NULL
;
}
pQueryMgmt
->
qinfoPool
=
taosCacheInit
(
TSDB_CACHE_PTR_KEY
,
refreshHandleInterval
,
true
,
freeqinfoFn
,
cacheName
);
pQueryMgmt
->
closed
=
false
;
pQueryMgmt
->
vgId
=
vgId
;
pthread_mutex_init
(
&
pQueryMgmt
->
lock
,
NULL
);
qDebug
(
"vgId:%d, open querymgmt success"
,
vgId
);
return
pQueryMgmt
;
}
void
qQueryMgmtNotifyClosed
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
SQueryMgmt
*
pQueryMgmt
=
pQMgmt
;
qDebug
(
"vgId:%d, set querymgmt closed, wait for all queries cancelled"
,
pQueryMgmt
->
vgId
);
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
pQueryMgmt
->
closed
=
true
;
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
taosCacheRefresh
(
pQueryMgmt
->
qinfoPool
,
queryMgmtKillQueryFn
);
}
void
qQueryMgmtReOpen
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
SQueryMgmt
*
pQueryMgmt
=
pQMgmt
;
qDebug
(
"vgId:%d, set querymgmt reopen"
,
pQueryMgmt
->
vgId
);
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
pQueryMgmt
->
closed
=
false
;
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
}
void
qCleanupQueryMgmt
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
SQueryMgmt
*
pQueryMgmt
=
pQMgmt
;
int32_t
vgId
=
pQueryMgmt
->
vgId
;
assert
(
pQueryMgmt
->
closed
);
SCacheObj
*
pqinfoPool
=
pQueryMgmt
->
qinfoPool
;
pQueryMgmt
->
qinfoPool
=
NULL
;
taosCacheCleanup
(
pqinfoPool
);
pthread_mutex_destroy
(
&
pQueryMgmt
->
lock
);
tfree
(
pQueryMgmt
);
qDebug
(
"vgId:%d, queryMgmt cleanup completed"
,
vgId
);
}
void
**
qRegisterQInfo
(
void
*
pMgmt
,
uint64_t
qInfo
)
{
if
(
pMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
qError
(
"QInfo:%p failed to add qhandle into cache, since qMgmt is colsing"
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
else
{
TSDB_CACHE_PTR_TYPE
handleVal
=
(
TSDB_CACHE_PTR_TYPE
)
qInfo
;
void
**
handle
=
taosCachePut
(
pQueryMgmt
->
qinfoPool
,
&
handleVal
,
sizeof
(
TSDB_CACHE_PTR_TYPE
),
&
qInfo
,
sizeof
(
TSDB_CACHE_PTR_TYPE
),
(
getMaximumIdleDurationSec
()
*
1000
));
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
return
handle
;
}
}
void
**
qAcquireQInfo
(
void
*
pMgmt
,
uint64_t
_key
)
{
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
closed
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
return
NULL
;
}
TSDB_CACHE_PTR_TYPE
key
=
(
TSDB_CACHE_PTR_TYPE
)
_key
;
void
**
handle
=
taosCacheAcquireByKey
(
pQueryMgmt
->
qinfoPool
,
&
key
,
sizeof
(
TSDB_CACHE_PTR_TYPE
));
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
return
NULL
;
}
else
{
return
handle
;
}
}
void
**
qReleaseQInfo
(
void
*
pMgmt
,
void
*
pQInfo
,
bool
freeHandle
)
{
SQueryMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
return
NULL
;
}
taosCacheRelease
(
pQueryMgmt
->
qinfoPool
,
pQInfo
,
freeHandle
);
return
0
;
}
src/util/inc/tarray.h
浏览文件 @
36c9ba0c
...
...
@@ -125,7 +125,7 @@ void taosArrayRemove(SArray* pArray, size_t index);
* @param pDst
* @param pSrc
*/
void
taosArrayCopy
(
SArray
*
pDst
,
const
SArray
*
pSrc
);
SArray
*
taosArrayFromList
(
const
void
*
src
,
size_t
size
,
size_t
elemSize
);
/**
* clone a new array
...
...
src/util/src/tarray.c
浏览文件 @
36c9ba0c
...
...
@@ -156,23 +156,14 @@ void taosArrayRemove(SArray* pArray, size_t index) {
pArray
->
size
-=
1
;
}
void
taosArrayCopy
(
SArray
*
pDst
,
const
SArray
*
pSrc
)
{
assert
(
pSrc
!=
NULL
&&
pDst
!=
NULL
);
if
(
pDst
->
capacity
<
pSrc
->
size
)
{
void
*
pData
=
realloc
(
pDst
->
pData
,
pSrc
->
size
*
pSrc
->
elemSize
);
if
(
pData
==
NULL
)
{
// todo handle oom
}
else
{
pDst
->
pData
=
pData
;
pDst
->
capacity
=
pSrc
->
size
;
}
}
memcpy
(
pDst
->
pData
,
pSrc
->
pData
,
pSrc
->
elemSize
*
pSrc
->
size
);
pDst
->
elemSize
=
pSrc
->
elemSize
;
pDst
->
capacity
=
pSrc
->
size
;
pDst
->
size
=
pSrc
->
size
;
SArray
*
taosArrayFromList
(
const
void
*
src
,
size_t
size
,
size_t
elemSize
)
{
assert
(
src
!=
NULL
&&
elemSize
>
0
);
SArray
*
pDst
=
taosArrayInit
(
size
,
elemSize
);
memcpy
(
pDst
->
pData
,
src
,
elemSize
*
size
);
pDst
->
size
=
size
;
return
pDst
;
}
SArray
*
taosArrayDup
(
const
SArray
*
pSrc
)
{
...
...
tests/script/general/parser/limit2_query.sim
浏览文件 @
36c9ba0c
...
...
@@ -143,6 +143,97 @@ if $data11 != -1 then
return -1
endi
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 8200
if $rows != 8200 then
return -1
endi
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 10 offset 8190;
if $rows != 10 then
return -1
endi
if $data00 != @18-10-15 19:30:00.000@ then
return -1
endi
if $data01 != 5 then
return -1
endi
if $data10 != @18-10-15 19:35:00.000@ then
return -1
endi
if $data11 != -1000 then
return -1
endi
if $data20 != @18-10-15 19:40:00.000@ then
return -1
endi
if $data21 != 6 then
return -1
endi
if $data30 != @18-10-15 19:45:00.000@ then
return -1
endi
if $data31 != -1000 then
return -1
endi
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 10 offset 10001;
if $rows != 10 then
return -1
endi
if $data00 != @18-10-22 02:25:00.000@ then
return -1
endi
if $data01 != -1000 then
return -1
endi
if $data10 != @18-10-22 02:30:00.000@ then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data20 != @18-10-22 02:35:00.000@ then
return -1
endi
if $data21 != -1000 then
return -1
endi
if $data30 != @18-10-22 02:40:00.000@ then
return -1
endi
if $data31 != 2 then
return -1
endi
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 10000 offset 10001;
print ====> needs to validate the last row result
if $rows != 9998 then
return -1
endi
sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 100 offset 20001;
if $rows != 0 then
return -1
endi
# tb + interval + fill(linear) + limit offset
$limit = $rowNum
$offset = $limit / 2
...
...
tests/script/general/parser/testSuite.sim
浏览文件 @
36c9ba0c
...
...
@@ -53,32 +53,32 @@
#run general/parser/limit1_tblocks100.sim
#sleep 100
#run general/parser/limit2.sim
#
sleep 100
#
run general/parser/mixed_blocks.sim
#
sleep 100
#
run general/parser/nchar.sim
#
sleep 100
#
run general/parser/null_char.sim
#
sleep 100
#
run general/parser/selectResNum.sim
#
sleep 100
#
run general/parser/select_across_vnodes.sim
#
sleep 100
#
run general/parser/select_from_cache_disk.sim
#
sleep 100
#
run general/parser/set_tag_vals.sim
#
sleep 100
#
run general/parser/single_row_in_tb.sim
#
sleep 100
#
run general/parser/slimit.sim
#
sleep 100
#
run general/parser/slimit1.sim
#
sleep 100
#
run general/parser/slimit_alter_tags.sim
#
sleep 100
#
run general/parser/tbnameIn.sim
#
sleep 100
#
run general/parser/slimit_alter_tags.sim # persistent failed
sleep 100
run general/parser/mixed_blocks.sim
sleep 100
run general/parser/nchar.sim
sleep 100
run general/parser/null_char.sim
sleep 100
run general/parser/selectResNum.sim
sleep 100
run general/parser/select_across_vnodes.sim
sleep 100
run general/parser/select_from_cache_disk.sim
sleep 100
run general/parser/set_tag_vals.sim
sleep 100
run general/parser/single_row_in_tb.sim
sleep 100
run general/parser/slimit.sim
sleep 100
run general/parser/slimit1.sim
sleep 100
run general/parser/slimit_alter_tags.sim
sleep 100
run general/parser/tbnameIn.sim
sleep 100
run general/parser/slimit_alter_tags.sim # persistent failed
sleep 100
run general/parser/join.sim
sleep 100
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录