Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
61d7d5c3
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
61d7d5c3
编写于
12月 15, 2020
作者:
H
haojun Liao
提交者:
GitHub
12月 15, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4551 from taosdata/feature/query
Feature/query
上级
7d134b04
ab5b23d8
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
185 addition
and
150 deletion
+185
-150
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+2
-2
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+2
-1
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+26
-15
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+1
-1
src/client/src/tscServer.c
src/client/src/tscServer.c
+34
-5
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+0
-4
src/inc/ttype.h
src/inc/ttype.h
+14
-13
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+21
-24
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+46
-53
src/query/src/qUtil.c
src/query/src/qUtil.c
+27
-28
tests/script/general/parser/fill.sim
tests/script/general/parser/fill.sim
+12
-4
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
61d7d5c3
...
...
@@ -285,8 +285,8 @@ typedef struct {
char
**
buffer
;
// Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex
*
pColumnIndex
;
SArithmeticSupport
*
pArithSup
;
// support the arithmetic expression calculation on agg functions
struct
SLocalReducer
*
pLocalReducer
;
SArithmeticSupport
*
pArithSup
;
// support the arithmetic expression calculation on agg functions
struct
SLocalReducer
*
pLocalReducer
;
}
SSqlRes
;
typedef
struct
STscObj
{
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
61d7d5c3
...
...
@@ -2222,7 +2222,8 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) {
tmp
+=
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64Key
;
size_t
size
=
sizeof
(
tValuePair
)
+
pCtx
->
tagInfo
.
tagsLen
;
// assert(pCtx->param[0].i64Key > 0);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
param
[
0
].
i64Key
;
++
i
)
{
pTopBotInfo
->
res
[
i
]
=
(
tValuePair
*
)
tmp
;
pTopBotInfo
->
res
[
i
]
->
pTags
=
tmp
+
sizeof
(
tValuePair
);
...
...
src/client/src/tscLocal.c
浏览文件 @
61d7d5c3
...
...
@@ -46,7 +46,8 @@ typedef struct SCreateBuilder {
SSqlObj
*
pInterSql
;
int32_t
(
*
fp
)(
void
*
para
,
char
*
result
);
Stage
callStage
;
}
SCreateBuilder
;
}
SCreateBuilder
;
static
void
tscSetLocalQueryResult
(
SSqlObj
*
pSql
,
const
char
*
val
,
const
char
*
columnName
,
int16_t
type
,
size_t
valueLength
);
static
int32_t
tscSetValueToResObj
(
SSqlObj
*
pSql
,
int32_t
rowLen
)
{
...
...
@@ -207,10 +208,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
const
int32_t
TYPE_COLUMN_LENGTH
=
16
;
const
int32_t
NOTE_COLUMN_MIN_LENGTH
=
8
;
int32_t
noteFieldLen
=
NOTE_COLUMN_MIN_LENGTH
;
//tscMaxLengthOfTagsFields(pSql);
// if (noteFieldLen == 0) {
// noteFieldLen = NOTE_COLUMN_MIN_LENGTH;
// }
int32_t
noteFieldLen
=
NOTE_COLUMN_MIN_LENGTH
;
int32_t
rowLen
=
tscBuildTableSchemaResultFields
(
pSql
,
NUM_OF_DESC_TABLE_COLUMNS
,
TYPE_COLUMN_LENGTH
,
noteFieldLen
);
tscFieldInfoUpdateOffset
(
pQueryInfo
);
...
...
@@ -822,26 +820,39 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) {
}
// TODO add test cases.
static
int32_t
checkForOnlineNode
(
SSqlObj
*
pSql
)
{
int32_t
*
data
=
pSql
->
res
.
length
;
if
(
data
==
NULL
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
total
=
data
[
0
];
int32_t
online
=
data
[
1
];
return
(
online
<
total
)
?
TSDB_CODE_RPC_NETWORK_UNAVAIL
:
TSDB_CODE_SUCCESS
;
}
static
int32_t
tscProcessServStatus
(
SSqlObj
*
pSql
)
{
STscObj
*
pObj
=
pSql
->
pTscObj
;
SSqlObj
*
pHb
=
(
SSqlObj
*
)
taosAcquireRef
(
tscObjRef
,
pObj
->
hbrid
);
if
(
pHb
!=
NULL
)
{
int32_t
code
=
pHb
->
res
.
code
;
pSql
->
res
.
code
=
pHb
->
res
.
code
;
taosReleaseRef
(
tscObjRef
,
pObj
->
hbrid
);
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
pSql
->
res
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
return
pSql
->
res
.
code
;
}
}
else
{
if
(
pSql
->
res
.
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
return
pSql
->
res
.
code
;
}
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pSql
->
res
.
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
return
pSql
->
res
.
code
;
}
pSql
->
res
.
code
=
checkForOnlineNode
(
pHb
);
if
(
pSql
->
res
.
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
)
{
return
pSql
->
res
.
code
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
int32_t
val
=
1
;
tscSetLocalQueryResult
(
pSql
,
(
char
*
)
&
val
,
pExpr
->
aliasName
,
TSDB_DATA_TYPE_INT
,
sizeof
(
int32_t
));
return
TSDB_CODE_SUCCESS
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
61d7d5c3
...
...
@@ -6562,7 +6562,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg5
);
}
if
(
pQueryInfo
->
interval
.
interval
>
0
&&
pQueryInfo
->
interval
.
intervalUnit
!=
'n'
&&
pQueryInfo
->
interval
.
intervalUnit
!=
'y'
)
{
if
(
pQueryInfo
->
interval
.
interval
>
0
)
{
bool
initialWindows
=
TSWINDOW_IS_EQUAL
(
pQueryInfo
->
window
,
TSWINDOW_INITIALIZER
);
if
(
initialWindows
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
...
...
src/client/src/tscServer.c
浏览文件 @
61d7d5c3
...
...
@@ -147,15 +147,15 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SSqlObj
*
pSql
=
tres
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
code
==
0
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
SHeartBeatRsp
*
pRsp
=
(
SHeartBeatRsp
*
)
pRes
->
pRsp
;
SRpcEpSet
*
epSet
=
&
pRsp
->
epSet
;
SRpcEpSet
*
epSet
=
&
pRsp
->
epSet
;
if
(
epSet
->
numOfEps
>
0
)
{
tscEpSetHtons
(
epSet
);
if
(
!
tscEpSetIsEqual
(
&
pSql
->
pTscObj
->
tscCorMgmtEpSet
->
epSet
,
epSet
))
{
tscTrace
(
"%p updating epset: numOfEps: %d, inUse: %d"
,
pSql
,
epSet
->
numOfEps
,
epSet
->
inUse
);
for
(
int8_t
i
=
0
;
i
<
epSet
->
numOfEps
;
i
++
)
{
tscTrace
(
"endpoint %d: fqdn
=
%s, port=%d"
,
i
,
epSet
->
fqdn
[
i
],
epSet
->
port
[
i
]);
tscTrace
(
"endpoint %d: fqdn
=
%s, port=%d"
,
i
,
epSet
->
fqdn
[
i
],
epSet
->
port
[
i
]);
}
tscUpdateMgmtEpSet
(
pSql
,
epSet
);
}
...
...
@@ -167,11 +167,40 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
tscKillConnection
(
pObj
);
return
;
}
else
{
if
(
pRsp
->
queryId
)
tscKillQuery
(
pObj
,
htonl
(
pRsp
->
queryId
));
if
(
pRsp
->
streamId
)
tscKillStream
(
pObj
,
htonl
(
pRsp
->
streamId
));
if
(
pRsp
->
queryId
)
{
tscKillQuery
(
pObj
,
htonl
(
pRsp
->
queryId
));
}
if
(
pRsp
->
streamId
)
{
tscKillStream
(
pObj
,
htonl
(
pRsp
->
streamId
));
}
}
int32_t
total
=
htonl
(
pRsp
->
totalDnodes
);
int32_t
online
=
htonl
(
pRsp
->
onlineDnodes
);
assert
(
online
<=
total
);
if
(
online
<
total
)
{
tscError
(
"HB:%p, total dnode:%d, online dnode:%d"
,
pSql
,
total
,
online
);
pSql
->
res
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
}
if
(
pRes
->
buffer
==
NULL
)
{
pRes
->
length
=
calloc
(
2
,
sizeof
(
int32_t
));
}
pRes
->
length
[
0
]
=
total
;
pRes
->
length
[
1
]
=
online
;
}
else
{
tscDebug
(
"%"
PRId64
" heartbeat failed, code:%s"
,
pObj
->
hbrid
,
tstrerror
(
code
));
if
(
pRes
->
buffer
==
NULL
)
{
pRes
->
length
=
calloc
(
2
,
sizeof
(
int32_t
));
}
pRes
->
length
[
1
]
=
0
;
if
(
pRes
->
length
[
0
]
==
0
)
{
pRes
->
length
[
0
]
=
1
;
// make sure that the value of the total node is greater than the online node
}
}
if
(
pObj
->
hbrid
!=
0
)
{
...
...
src/client/src/tscSubquery.c
浏览文件 @
61d7d5c3
...
...
@@ -2279,7 +2279,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
*/
int32_t
tscHandleInsertRetry
(
SSqlObj
*
pParent
,
SSqlObj
*
pSql
)
{
assert
(
pSql
!=
NULL
&&
pSql
->
param
!=
NULL
);
// SSqlCmd* pCmd = &pSql->cmd;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SInsertSupporter
*
pSupporter
=
(
SInsertSupporter
*
)
pSql
->
param
;
...
...
@@ -2288,9 +2287,6 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
STableDataBlocks
*
pTableDataBlock
=
taosArrayGetP
(
pParent
->
cmd
.
pDataBlocks
,
pSupporter
->
index
);
int32_t
code
=
tscCopyDataBlockToPayload
(
pSql
,
pTableDataBlock
);
// free the data block created from insert sql string
// pCmd->pDataBlocks = tscDestroyBlockArrayList(pParent->cmd.pDataBlocks);
if
((
pRes
->
code
=
code
)
!=
TSDB_CODE_SUCCESS
)
{
tscQueueAsyncRes
(
pSql
);
return
code
;
// here the pSql may have been released already.
...
...
src/inc/ttype.h
浏览文件 @
61d7d5c3
...
...
@@ -8,25 +8,26 @@ extern "C" {
#include "taosdef.h"
#define GET_TYPED_DATA(_v, _finalType, _type, _data) \
switch (_type) { \
case TSDB_DATA_TYPE_TINYINT: \
switch (_type) { \
case TSDB_DATA_TYPE_BOOL: \
case TSDB_DATA_TYPE_TINYINT: \
(_v) = (_finalType)GET_INT8_VAL(_data); \
break; \
case TSDB_DATA_TYPE_SMALLINT: \
break;
\
case TSDB_DATA_TYPE_SMALLINT:
\
(_v) = (_finalType)GET_INT16_VAL(_data); \
break; \
case TSDB_DATA_TYPE_BIGINT: \
break;
\
case TSDB_DATA_TYPE_BIGINT:
\
(_v) = (_finalType)(GET_INT64_VAL(_data)); \
break; \
case TSDB_DATA_TYPE_FLOAT: \
break;
\
case TSDB_DATA_TYPE_FLOAT:
\
(_v) = (_finalType)GET_FLOAT_VAL(_data); \
break; \
case TSDB_DATA_TYPE_DOUBLE: \
break;
\
case TSDB_DATA_TYPE_DOUBLE:
\
(_v) = (_finalType)GET_DOUBLE_VAL(_data); \
break; \
default: \
break;
\
default:
\
(_v) = (_finalType)GET_INT32_VAL(_data); \
break; \
break;
\
};
#ifdef __cplusplus
...
...
src/query/inc/qUtil.h
浏览文件 @
61d7d5c3
...
...
@@ -24,36 +24,35 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
int32_t
getOutputInterResultBufSize
(
SQuery
*
pQuery
);
void
clearResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pRow
,
int16_t
type
);
void
copyResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
dst
,
const
SResultRow
*
src
,
int16_t
type
);
SResultRowCellInfo
*
getResultCell
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
const
SResultRow
*
pRow
,
int32_t
index
);
size_t
getResultRowSize
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
int32_t
initWindowResInfo
(
SResultRowInfo
*
pWindowResInfo
,
int32_t
size
,
int16_t
type
);
void
resetResultRowInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
void
popFrontResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
num
);
void
clearClosedResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
removeRedundantResultRows
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
);
void
cleanupTimeWindowInfo
(
SResultRowInfo
*
pWindowResInfo
);
void
resetTimeWindowInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pWindowResInfo
);
void
clearFirstNWindowRes
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
num
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
clearResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
);
void
copyResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
dst
,
const
SResultRow
*
src
,
int16_t
type
);
void
clearClosedTimeWindow
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
int32_t
numOfClosedTimeWindow
(
SResultRowInfo
*
pWindowResInfo
);
void
closeTimeWindow
(
SResultRowInfo
*
pWindowResInfo
,
int32_t
slot
);
void
closeAllTimeWindow
(
SResultRowInfo
*
pWindowResInfo
);
void
removeRedundantWindow
(
SResultRowInfo
*
pWindowResInfo
,
TSKEY
lastKey
,
int32_t
order
);
SResultRowCellInfo
*
getResultCell
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
const
SResultRow
*
pRow
,
int32_t
index
);
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SResultRowInfo
*
p
WindowRes
Info
,
int32_t
slot
)
{
assert
(
p
WindowResInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pWindowRes
Info
->
size
);
return
p
WindowRes
Info
->
pResult
[
slot
];
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SResultRowInfo
*
p
ResultRow
Info
,
int32_t
slot
)
{
assert
(
p
ResultRowInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pResultRow
Info
->
size
);
return
p
ResultRow
Info
->
pResult
[
slot
];
}
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
bool
isWindowResClosed
(
SResultRowInfo
*
pWindowResInfo
,
int32_t
slot
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
static
FORCE_INLINE
char
*
getPosInResultPage
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
columnIndex
,
SResultRow
*
pResult
,
tFilePage
*
page
)
{
assert
(
pResult
!=
NULL
&&
pRuntimeEnv
!=
NULL
);
...
...
@@ -71,8 +70,6 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval);
__filter_func_t
*
getRangeFilterFuncArray
(
int32_t
type
);
__filter_func_t
*
getValueFilterFuncArray
(
int32_t
type
);
size_t
getWindowResultSize
(
SQueryRuntimeEnv
*
pRuntimeEnv
);
SResultRowPool
*
initResultRowPool
(
size_t
size
);
SResultRow
*
getNewResultRow
(
SResultRowPool
*
p
);
int64_t
getResultRowPoolMemSize
(
SResultRowPool
*
p
);
...
...
src/query/src/qExecutor.c
浏览文件 @
61d7d5c3
...
...
@@ -464,13 +464,13 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
return
true
;
}
static
SResultRow
*
doPrepareResultRowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
p
WindowRes
Info
,
char
*
pData
,
static
SResultRow
*
doPrepareResultRowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
p
ResultRow
Info
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
uid
)
{
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
pData
,
bytes
,
uid
);
int32_t
*
p1
=
(
int32_t
*
)
taosHashGet
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
if
(
p1
!=
NULL
)
{
p
WindowRes
Info
->
curIndex
=
*
p1
;
p
ResultRow
Info
->
curIndex
=
*
p1
;
}
else
{
if
(
!
masterscan
)
{
// not master scan, do not add new timewindow
return
NULL
;
...
...
@@ -478,46 +478,46 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
// TODO refactor
// more than the capacity, reallocate the resources
if
(
p
WindowResInfo
->
size
>=
pWindowRes
Info
->
capacity
)
{
if
(
p
ResultRowInfo
->
size
>=
pResultRow
Info
->
capacity
)
{
int64_t
newCapacity
=
0
;
if
(
p
WindowRes
Info
->
capacity
>
10000
)
{
newCapacity
=
(
int64_t
)(
p
WindowRes
Info
->
capacity
*
1
.
25
);
if
(
p
ResultRow
Info
->
capacity
>
10000
)
{
newCapacity
=
(
int64_t
)(
p
ResultRow
Info
->
capacity
*
1
.
25
);
}
else
{
newCapacity
=
(
int64_t
)(
p
WindowRes
Info
->
capacity
*
1
.
5
);
newCapacity
=
(
int64_t
)(
p
ResultRow
Info
->
capacity
*
1
.
5
);
}
char
*
t
=
realloc
(
p
WindowRes
Info
->
pResult
,
(
size_t
)(
newCapacity
*
POINTER_BYTES
));
char
*
t
=
realloc
(
p
ResultRow
Info
->
pResult
,
(
size_t
)(
newCapacity
*
POINTER_BYTES
));
if
(
t
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
p
WindowRes
Info
->
pResult
=
(
SResultRow
**
)
t
;
p
ResultRow
Info
->
pResult
=
(
SResultRow
**
)
t
;
int32_t
inc
=
(
int32_t
)
newCapacity
-
p
WindowRes
Info
->
capacity
;
memset
(
&
p
WindowResInfo
->
pResult
[
pWindowRes
Info
->
capacity
],
0
,
POINTER_BYTES
*
inc
);
int32_t
inc
=
(
int32_t
)
newCapacity
-
p
ResultRow
Info
->
capacity
;
memset
(
&
p
ResultRowInfo
->
pResult
[
pResultRow
Info
->
capacity
],
0
,
POINTER_BYTES
*
inc
);
p
WindowRes
Info
->
capacity
=
(
int32_t
)
newCapacity
;
p
ResultRow
Info
->
capacity
=
(
int32_t
)
newCapacity
;
}
SResultRow
*
pResult
=
getNewResultRow
(
pRuntimeEnv
->
pool
);
p
WindowResInfo
->
pResult
[
pWindowRes
Info
->
size
]
=
pResult
;
p
ResultRowInfo
->
pResult
[
pResultRow
Info
->
size
]
=
pResult
;
int32_t
ret
=
initResultRow
(
pResult
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
// add a new result set for a new group
p
WindowResInfo
->
curIndex
=
pWindowRes
Info
->
size
++
;
p
ResultRowInfo
->
curIndex
=
pResultRow
Info
->
size
++
;
taosHashPut
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
(
char
*
)
&
p
WindowRes
Info
->
curIndex
,
sizeof
(
int32_t
));
(
char
*
)
&
p
ResultRow
Info
->
curIndex
,
sizeof
(
int32_t
));
}
// too many time window in query
if
(
p
WindowRes
Info
->
size
>
MAX_INTERVAL_TIME_WINDOW
)
{
if
(
p
ResultRow
Info
->
size
>
MAX_INTERVAL_TIME_WINDOW
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
}
return
getResultRow
(
p
WindowResInfo
,
pWindowRes
Info
->
curIndex
);
return
getResultRow
(
p
ResultRowInfo
,
pResultRow
Info
->
curIndex
);
}
// get the correct time window according to the handled timestamp
...
...
@@ -614,14 +614,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return
0
;
}
static
int32_t
setWindowOutputBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
p
WindowRes
Info
,
SDataBlockInfo
*
pBockInfo
,
static
int32_t
setWindowOutputBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
p
ResultRow
Info
,
SDataBlockInfo
*
pBockInfo
,
STimeWindow
*
win
,
bool
masterscan
,
bool
*
newWind
,
SResultRow
**
pResult
)
{
assert
(
win
->
skey
<=
win
->
ekey
);
SDiskbasedResultBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
// todo refactor
int64_t
uid
=
getResultInfoUId
(
pRuntimeEnv
);
SResultRow
*
pResultRow
=
doPrepareResultRowFromKey
(
pRuntimeEnv
,
p
WindowRes
Info
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
masterscan
,
uid
);
SResultRow
*
pResultRow
=
doPrepareResultRowFromKey
(
pRuntimeEnv
,
p
ResultRow
Info
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
masterscan
,
uid
);
if
(
pResultRow
==
NULL
)
{
*
newWind
=
false
;
...
...
@@ -717,7 +717,7 @@ static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY
TSKEY
ekey
=
pResult
->
win
.
ekey
;
if
((
ekey
<=
lastKey
&&
ascQuery
)
||
(
pResult
->
win
.
skey
>=
lastKey
&&
!
ascQuery
))
{
close
TimeWind
ow
(
pWindowResInfo
,
i
);
close
ResultR
ow
(
pWindowResInfo
,
i
);
}
else
{
skey
=
pResult
->
win
.
skey
;
break
;
...
...
@@ -751,7 +751,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
// query completed
if
((
lastKey
>=
pQuery
->
current
->
win
.
ekey
&&
ascQuery
)
||
(
lastKey
<=
pQuery
->
current
->
win
.
ekey
&&
(
!
ascQuery
)))
{
closeAll
TimeWindow
(
pWindowResInfo
);
closeAll
ResultRows
(
pWindowResInfo
);
pWindowResInfo
->
curIndex
=
pWindowResInfo
->
size
-
1
;
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
|
QUERY_RESBUF_FULL
);
...
...
@@ -1351,14 +1351,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
}
int64_t
v
=
-
1
;
switch
(
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_TINYINT
:
v
=
GET_INT8_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
v
=
GET_INT16_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_INT
:
v
=
GET_INT32_VAL
(
pData
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
v
=
GET_INT64_VAL
(
pData
);
break
;
}
GET_TYPED_DATA
(
v
,
int64_t
,
type
,
pData
);
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
pResultRow
->
key
==
NULL
)
{
pResultRow
->
key
=
malloc
(
varDataTLen
(
pData
));
...
...
@@ -1790,7 +1783,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
numOfRes
=
doCheckQueryCompleted
(
pRuntimeEnv
,
lastKey
,
pWindowResInfo
);
}
else
if
(
pRuntimeEnv
->
groupbyNormalCol
)
{
closeAll
TimeWindow
(
pWindowResInfo
);
closeAll
ResultRows
(
pWindowResInfo
);
numOfRes
=
pWindowResInfo
->
size
;
}
else
{
// projection query
numOfRes
=
(
int32_t
)
getNumOfResult
(
pRuntimeEnv
);
...
...
@@ -2094,7 +2087,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pRuntimeEnv
);
qDebug
(
"QInfo:%p teardown runtime env"
,
pQInfo
);
cleanup
TimeWind
owInfo
(
&
pRuntimeEnv
->
windowResInfo
);
cleanup
ResultR
owInfo
(
&
pRuntimeEnv
->
windowResInfo
);
if
(
pRuntimeEnv
->
pCtx
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
...
...
@@ -2928,7 +2921,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
(
IS_MASTER_SCAN
(
pRuntimeEnv
)
||
pRuntimeEnv
->
scanFlag
==
REPEAT_SCAN
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
closeAll
TimeWindow
(
&
pRuntimeEnv
->
windowResInfo
);
closeAll
ResultRows
(
&
pRuntimeEnv
->
windowResInfo
);
pRuntimeEnv
->
windowResInfo
.
curIndex
=
pRuntimeEnv
->
windowResInfo
.
size
-
1
;
// point to the last time window
}
else
{
assert
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_RESBUF_FULL
));
...
...
@@ -3707,8 +3700,8 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t
initResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
pCellInfo
=
(
SResultRowCellInfo
*
)((
char
*
)
pResultRow
+
sizeof
(
SResultRow
));
pResultRow
->
pageId
=
-
1
;
pResultRow
->
rowId
=
-
1
;
pResultRow
->
pageId
=
-
1
;
pResultRow
->
rowId
=
-
1
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4057,12 +4050,12 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
// for each group result, call the finalize function for each column
SResultRowInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
pRuntimeEnv
->
groupbyNormalCol
)
{
closeAll
TimeWindow
(
pWindowResInfo
);
closeAll
ResultRows
(
pWindowResInfo
);
}
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SResultRow
*
buf
=
pWindowResInfo
->
pResult
[
i
];
if
(
!
is
WindowRes
Closed
(
pWindowResInfo
,
i
))
{
if
(
!
is
ResultRow
Closed
(
pWindowResInfo
,
i
))
{
continue
;
}
...
...
@@ -4112,7 +4105,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
// set more initial size of interval/groupby query
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
pRuntimeEnv
->
groupbyNormalCol
)
{
int32_t
initialSize
=
128
;
int32_t
code
=
init
WindowRes
Info
(
&
pTableQueryInfo
->
windowResInfo
,
initialSize
,
TSDB_DATA_TYPE_INT
);
int32_t
code
=
init
ResultRow
Info
(
&
pTableQueryInfo
->
windowResInfo
,
initialSize
,
TSDB_DATA_TYPE_INT
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
...
...
@@ -4128,7 +4121,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
}
tVariantDestroy
(
&
pTableQueryInfo
->
tag
);
cleanup
TimeWind
owInfo
(
&
pTableQueryInfo
->
windowResInfo
);
cleanup
ResultR
owInfo
(
&
pTableQueryInfo
->
windowResInfo
);
}
/**
...
...
@@ -4360,7 +4353,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_
int32_t
step
=
-
1
;
qDebug
(
"QInfo:%p start to copy data from windowResInfo to query buf"
,
pQInfo
);
int32_t
totalSet
=
numOfClosed
TimeWindow
(
pResultInfo
);
int32_t
totalSet
=
numOfClosed
ResultRows
(
pResultInfo
);
SResultRow
**
result
=
pResultInfo
->
pResult
;
if
(
orderType
==
TSDB_ORDER_ASC
)
{
...
...
@@ -4481,7 +4474,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
// TODO refactor
if
((
pTableQueryInfo
->
lastKey
>=
pTableQueryInfo
->
win
.
ekey
&&
ascQuery
)
||
(
pTableQueryInfo
->
lastKey
<=
pTableQueryInfo
->
win
.
ekey
&&
(
!
ascQuery
)))
{
closeAll
TimeWindow
(
pWindowResInfo
);
closeAll
ResultRows
(
pWindowResInfo
);
pWindowResInfo
->
curIndex
=
pWindowResInfo
->
size
-
1
;
}
else
{
updateResultRowCurrentIndex
(
pWindowResInfo
,
pTableQueryInfo
->
lastKey
,
ascQuery
);
...
...
@@ -5031,7 +5024,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type
=
TSDB_DATA_TYPE_INT
;
// group id
}
code
=
init
WindowRes
Info
(
&
pRuntimeEnv
->
windowResInfo
,
8
,
type
);
code
=
init
ResultRow
Info
(
&
pRuntimeEnv
->
windowResInfo
,
8
,
type
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -5051,7 +5044,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
}
code
=
init
WindowRes
Info
(
&
pRuntimeEnv
->
windowResInfo
,
numOfResultRows
,
type
);
code
=
init
ResultRow
Info
(
&
pRuntimeEnv
->
windowResInfo
,
numOfResultRows
,
type
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -5479,7 +5472,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQInfo
->
groupIndex
=
currentGroupIndex
;
// restore the group index
assert
(
pQuery
->
rec
.
rows
==
pWindowResInfo
->
size
);
clearClosed
TimeWindow
(
pRuntimeEnv
);
clearClosed
ResultRows
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
break
;
}
}
else
if
(
pRuntimeEnv
->
queryWindowIdentical
&&
pRuntimeEnv
->
pTSBuf
==
NULL
&&
!
isTSCompQuery
(
pQuery
))
{
...
...
@@ -5641,7 +5634,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
resetDefaultResInfoOutputBuf
(
pRuntimeEnv
);
reset
TimeWind
owInfo
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
reset
ResultR
owInfo
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
);
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
0
);
assert
(
taosArrayGetSize
(
group
)
==
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
&&
...
...
@@ -5796,11 +5789,11 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
size_t
num
=
taosArrayGetSize
(
group
);
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
STableQueryInfo
*
item
=
taosArrayGetP
(
group
,
j
);
closeAll
TimeWindow
(
&
item
->
windowResInfo
);
closeAll
ResultRows
(
&
item
->
windowResInfo
);
}
}
}
else
{
// close results for group result
closeAll
TimeWindow
(
&
pQInfo
->
runtimeEnv
.
windowResInfo
);
closeAll
ResultRows
(
&
pQInfo
->
runtimeEnv
.
windowResInfo
);
}
}
...
...
@@ -6048,10 +6041,10 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start)
if
((
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTSBuf
!=
NULL
)
&&
pQuery
->
limit
.
offset
>
0
&&
pQuery
->
fillType
==
TSDB_FILL_NONE
)
{
// maxOutput <= 0, means current query does not generate any results
int32_t
numOfClosed
=
numOfClosed
TimeWindow
(
&
pRuntimeEnv
->
windowResInfo
);
int32_t
numOfClosed
=
numOfClosed
ResultRows
(
&
pRuntimeEnv
->
windowResInfo
);
int32_t
c
=
(
int32_t
)(
MIN
(
numOfClosed
,
pQuery
->
limit
.
offset
));
clearFirstNWindowRes
(
pRuntimeEnv
,
c
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
c
);
pQuery
->
limit
.
offset
-=
c
;
}
...
...
@@ -6088,7 +6081,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
pQuery
->
rec
.
rows
=
0
;
copyFromWindowResToSData
(
pQInfo
,
&
pRuntimeEnv
->
windowResInfo
);
clearFirstNWindowRes
(
pRuntimeEnv
,
pQInfo
->
groupIndex
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
pQInfo
->
groupIndex
);
}
// no result generated, abort
...
...
@@ -6121,16 +6114,16 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
// all data scanned, the group by normal column can return
if
(
pRuntimeEnv
->
groupbyNormalCol
)
{
// todo refactor with merge interval time result
// maxOutput <= 0, means current query does not generate any results
int32_t
numOfClosed
=
numOfClosed
TimeWindow
(
&
pRuntimeEnv
->
windowResInfo
);
int32_t
numOfClosed
=
numOfClosed
ResultRows
(
&
pRuntimeEnv
->
windowResInfo
);
if
((
pQuery
->
limit
.
offset
>
0
&&
pQuery
->
limit
.
offset
<
numOfClosed
)
||
pQuery
->
limit
.
offset
==
0
)
{
// skip offset result rows
clearFirstNWindowRes
(
pRuntimeEnv
,
(
int32_t
)
pQuery
->
limit
.
offset
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
(
int32_t
)
pQuery
->
limit
.
offset
);
pQuery
->
rec
.
rows
=
0
;
pQInfo
->
groupIndex
=
0
;
copyFromWindowResToSData
(
pQInfo
,
&
pRuntimeEnv
->
windowResInfo
);
clearFirstNWindowRes
(
pRuntimeEnv
,
pQInfo
->
groupIndex
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
pQInfo
->
groupIndex
);
doSecondaryArithmeticProcess
(
pQuery
);
limitResults
(
pRuntimeEnv
);
...
...
@@ -6164,7 +6157,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
if
(
pRuntimeEnv
->
windowResInfo
.
size
>
0
)
{
copyFromWindowResToSData
(
pQInfo
,
&
pRuntimeEnv
->
windowResInfo
);
clearFirstNWindowRes
(
pRuntimeEnv
,
pQInfo
->
groupIndex
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
pQInfo
->
groupIndex
);
if
(
pQuery
->
rec
.
rows
>
0
)
{
qDebug
(
"QInfo:%p %"
PRId64
" rows returned from group results, total:%"
PRId64
""
,
pQInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
);
...
...
@@ -7029,7 +7022,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQInfo
->
runtimeEnv
.
pResultRowHashTable
=
taosHashInit
(
pTableGroupInfo
->
numOfTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pQInfo
->
runtimeEnv
.
keyBuf
=
malloc
(
TSDB_MAX_BYTES_PER_ROW
);
pQInfo
->
runtimeEnv
.
pool
=
initResultRowPool
(
get
WindowResult
Size
(
&
pQInfo
->
runtimeEnv
));
pQInfo
->
runtimeEnv
.
pool
=
initResultRowPool
(
get
ResultRow
Size
(
&
pQInfo
->
runtimeEnv
));
pQInfo
->
runtimeEnv
.
prevRow
=
malloc
(
POINTER_BYTES
*
pQuery
->
numOfCols
+
srcSize
);
char
*
start
=
POINTER_BYTES
*
pQuery
->
numOfCols
+
(
char
*
)
pQInfo
->
runtimeEnv
.
prevRow
;
...
...
src/query/src/qUtil.c
浏览文件 @
61d7d5c3
...
...
@@ -43,7 +43,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
return
size
;
}
int32_t
init
WindowRes
Info
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
)
{
int32_t
init
ResultRow
Info
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
)
{
pResultRowInfo
->
capacity
=
size
;
pResultRowInfo
->
type
=
type
;
...
...
@@ -59,10 +59,11 @@ int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t
return
TSDB_CODE_SUCCESS
;
}
void
cleanup
TimeWind
owInfo
(
SResultRowInfo
*
pResultRowInfo
)
{
void
cleanup
ResultR
owInfo
(
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
)
{
return
;
}
if
(
pResultRowInfo
->
capacity
==
0
)
{
assert
(
pResultRowInfo
->
pResult
==
NULL
);
return
;
...
...
@@ -77,7 +78,7 @@ void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) {
tfree
(
pResultRowInfo
->
pResult
);
}
void
reset
TimeWind
owInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
void
reset
ResultR
owInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
)
{
return
;
}
...
...
@@ -100,13 +101,12 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultR
pResultRowInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
}
void
clearFirstNWindowRes
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
num
)
{
SResultRowInfo
*
pResultRowInfo
=
&
pRuntimeEnv
->
windowResInfo
;
void
popFrontResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
num
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
||
pResultRowInfo
->
size
==
0
||
num
==
0
)
{
return
;
}
int32_t
numOfClosed
=
numOfClosed
TimeWindow
(
pResultRowInfo
);
int32_t
numOfClosed
=
numOfClosed
ResultRows
(
pResultRowInfo
);
assert
(
num
>=
0
&&
num
<=
numOfClosed
);
int16_t
type
=
pResultRowInfo
->
type
;
...
...
@@ -159,17 +159,16 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
pResultRowInfo
->
curIndex
=
-
1
;
}
void
clearClosedTimeWindow
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SResultRowInfo
*
pResultRowInfo
=
&
pRuntimeEnv
->
windowResInfo
;
void
clearClosedResultRows
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
||
pResultRowInfo
->
size
==
0
)
{
return
;
}
int32_t
numOfClosed
=
numOfClosed
TimeWindow
(
pResultRowInfo
);
clearFirstNWindowRes
(
pRuntimeEnv
,
numOfClosed
);
int32_t
numOfClosed
=
numOfClosed
ResultRows
(
pResultRowInfo
);
popFrontResultRow
(
pRuntimeEnv
,
&
pRuntimeEnv
->
windowResInfo
,
numOfClosed
);
}
int32_t
numOfClosed
TimeWindow
(
SResultRowInfo
*
pResultRowInfo
)
{
int32_t
numOfClosed
ResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
int32_t
i
=
0
;
while
(
i
<
pResultRowInfo
->
size
&&
pResultRowInfo
->
pResult
[
i
]
->
closed
)
{
++
i
;
...
...
@@ -178,7 +177,7 @@ int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) {
return
i
;
}
void
closeAll
TimeWindow
(
SResultRowInfo
*
pResultRowInfo
)
{
void
closeAll
ResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
...
...
@@ -195,7 +194,7 @@ void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) {
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order
*/
void
removeRedundant
Window
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
)
{
void
removeRedundant
ResultRows
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
int32_t
order
)
{
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
if
(
pResultRowInfo
->
size
<=
1
)
{
return
;
...
...
@@ -224,27 +223,27 @@ void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_
}
}
bool
is
WindowRes
Closed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
bool
is
ResultRow
Closed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
return
(
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
==
true
);
}
void
close
TimeWind
ow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
void
close
ResultR
ow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
=
true
;
}
void
clearResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
p
WindowRes
,
int16_t
type
)
{
if
(
p
WindowRes
==
NULL
)
{
void
clearResultRow
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
p
ResultRow
,
int16_t
type
)
{
if
(
p
ResultRow
==
NULL
)
{
return
;
}
// the result does not put into the SDiskbasedResultBuf, ignore it.
if
(
p
WindowRes
->
pageId
>=
0
)
{
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
p
WindowRes
->
pageId
);
if
(
p
ResultRow
->
pageId
>=
0
)
{
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
p
ResultRow
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
pQuery
->
numOfOutput
;
++
i
)
{
SResultRowCellInfo
*
pResultInfo
=
&
p
WindowRes
->
pCellInfo
[
i
];
SResultRowCellInfo
*
pResultInfo
=
&
p
ResultRow
->
pCellInfo
[
i
];
char
*
s
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
p
WindowRes
,
page
);
char
*
s
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
p
ResultRow
,
page
);
size_t
size
=
pRuntimeEnv
->
pQuery
->
pExpr1
[
i
].
bytes
;
memset
(
s
,
0
,
size
);
...
...
@@ -252,15 +251,15 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16
}
}
p
WindowRes
->
numOfRows
=
0
;
p
WindowRes
->
pageId
=
-
1
;
p
WindowRes
->
rowId
=
-
1
;
p
WindowRes
->
closed
=
false
;
p
ResultRow
->
numOfRows
=
0
;
p
ResultRow
->
pageId
=
-
1
;
p
ResultRow
->
rowId
=
-
1
;
p
ResultRow
->
closed
=
false
;
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
tfree
(
p
WindowRes
->
key
);
tfree
(
p
ResultRow
->
key
);
}
else
{
p
WindowRes
->
win
=
TSWINDOW_INITIALIZER
;
p
ResultRow
->
win
=
TSWINDOW_INITIALIZER
;
}
}
...
...
@@ -310,7 +309,7 @@ SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRo
return
(
SResultRowCellInfo
*
)((
char
*
)
pRow
->
pCellInfo
+
pRuntimeEnv
->
rowCellInfoOffset
[
index
]);
}
size_t
get
WindowResult
Size
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
size_t
get
ResultRow
Size
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
return
(
pRuntimeEnv
->
pQuery
->
numOfOutput
*
sizeof
(
SResultRowCellInfo
))
+
pRuntimeEnv
->
interBufSize
+
sizeof
(
SResultRow
);
}
...
...
tests/script/general/parser/fill.sim
浏览文件 @
61d7d5c3
...
...
@@ -848,10 +848,7 @@ if $rows != 12 then
return -1
endi
print =====================>td-1442
sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev);
print =====================> aggregation + arithmetic + fill
print =====================> aggregation + arithmetic + fill, need to add cases TODO
#sql select avg(cpu_taosd) - first(cpu_taosd) from dn1 where ts<'2020-11-13 11:00:00' and ts>'2020-11-13 10:50:00' interval(10s) fill(value, 99)
#sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(value, 99);
#sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(NULL);
...
...
@@ -1044,6 +1041,17 @@ if $data12 != 1 then
return -1
endi
print =====================>td-1442, td-2190 , no time range for fill option
sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev);
sql_error select min(c3) from m_fl_mt0 interval(10a) fill(value, 20)
sql_error select min(c3) from m_fl_mt0 interval(10s) fill(value, 20)
sql_error select min(c3) from m_fl_mt0 interval(10m) fill(value, 20)
sql_error select min(c3) from m_fl_mt0 interval(10h) fill(value, 20)
sql_error select min(c3) from m_fl_mt0 interval(10d) fill(value, 20)
sql_error select min(c3) from m_fl_mt0 interval(10w) fill(value, 20)
sql_error select max(c3) from m_fl_mt0 interval(1n) fill(prev)
sql_error select min(c3) from m_fl_mt0 interval(1y) fill(value, 20)
print =============== clear
#sql drop database $db
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录