Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
loser646
TDengine
提交
b6efe1c4
T
TDengine
项目概览
loser646
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
b6efe1c4
编写于
8月 04, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge from develop
上级
e45b047e
a73dd777
变更
12
展开全部
隐藏空白更改
内联
并排
Showing
12 changed file
with
1665 addition
and
30 deletion
+1665
-30
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+1
-3
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+2
-0
src/mnode/src/mnodeShow.c
src/mnode/src/mnodeShow.c
+5
-1
src/mnode/src/mnodeWrite.c
src/mnode/src/mnodeWrite.c
+8
-1
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+5
-0
src/query/src/qAggMain.c
src/query/src/qAggMain.c
+42
-10
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+275
-10
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+1
-1
tests/pytest/fulltest.sh
tests/pytest/fulltest.sh
+1
-1
tests/script/general/parser/interp.sim
tests/script/general/parser/interp.sim
+4
-1
tests/script/general/parser/interp_test.sim
tests/script/general/parser/interp_test.sim
+1320
-1
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
未找到文件。
src/client/src/tscParseInsert.c
浏览文件 @
b6efe1c4
...
...
@@ -991,9 +991,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
index
=
0
;
sToken
=
tStrGetToken
(
*
str
,
&
index
,
false
);
if
(
sToken
.
n
==
0
||
sToken
.
type
!=
TK_RP
)
{
tscSQLSyntaxErrMsg
(
pInsertParam
->
msg
,
") expected"
,
*
str
);
code
=
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
return
code
;
return
tscSQLSyntaxErrMsg
(
pInsertParam
->
msg
,
") expected"
,
*
str
);
}
*
str
+=
index
;
...
...
src/kit/taosdemo/taosdemo.c
浏览文件 @
b6efe1c4
...
...
@@ -75,6 +75,7 @@ extern char configDir[];
#define BUFFER_SIZE TSDB_MAX_ALLOWED_SQL_LEN
#define COND_BUF_LEN (BUFFER_SIZE - 30)
#define COL_BUFFER_LEN ((TSDB_COL_NAME_LEN + 15) * TSDB_MAX_COLUMNS)
#define MAX_USERNAME_SIZE 64
#define MAX_PASSWORD_SIZE 64
#define MAX_HOSTNAME_SIZE 253 // https://man7.org/linux/man-pages/man7/hostname.7.html
...
...
@@ -1413,6 +1414,7 @@ static char *rand_float_str()
return
g_randfloat_buff
+
(
cursor
*
FLOAT_BUFF_LEN
);
}
static
float
rand_float
()
{
static
int
cursor
;
...
...
src/mnode/src/mnodeShow.c
浏览文件 @
b6efe1c4
...
...
@@ -253,11 +253,15 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
int32_t
connId
=
htonl
(
pHBMsg
->
connId
);
SConnObj
*
pConn
=
mnodeAccquireConn
(
connId
,
connInfo
.
user
,
connInfo
.
clientIp
,
connInfo
.
clientPort
);
if
(
pConn
==
NULL
)
{
pHBMsg
->
pid
=
htonl
(
pHBMsg
->
pid
);
pConn
=
mnodeCreateConn
(
connInfo
.
user
,
connInfo
.
clientIp
,
connInfo
.
clientPort
,
pHBMsg
->
pid
,
pHBMsg
->
appName
);
}
if
(
pConn
==
NULL
)
{
// do not close existing links, otherwise
// mError("failed to create connId, close connect");
// pRsp->killConnection = 1;
// pRsp->killConnection = 1;
}
else
{
pRsp
->
connId
=
htonl
(
pConn
->
connId
);
mnodeSaveQueryStreamList
(
pConn
,
pHBMsg
);
...
...
src/mnode/src/mnodeWrite.c
浏览文件 @
b6efe1c4
...
...
@@ -65,7 +65,14 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
return
TSDB_CODE_MND_MSG_NOT_PROCESSED
;
}
int32_t
code
=
mnodeInitMsg
(
pMsg
);
int32_t
code
=
grantCheck
(
TSDB_GRANT_TIME
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"msg:%p, app:%p type:%s not processed, reason:%s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
tstrerror
(
code
));
return
code
;
}
code
=
mnodeInitMsg
(
pMsg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"msg:%p, app:%p type:%s not processed, reason:%s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
taosMsg
[
pMsg
->
rpcMsg
.
msgType
],
tstrerror
(
code
));
...
...
src/query/inc/qExecutor.h
浏览文件 @
b6efe1c4
...
...
@@ -206,6 +206,7 @@ typedef struct SQueryAttr {
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// TODO used bitwise flag
bool
interpQuery
;
// denote if this is an interp query
bool
groupbyColumn
;
// denote if this is a groupby normal column query
bool
hasTagResults
;
// if there are tag values in final result or not
bool
timeWindowInterpo
;
// if the time window start/end required interpolation
...
...
@@ -331,6 +332,8 @@ enum OPERATOR_TYPE_E {
OP_Distinct
=
20
,
OP_Join
=
21
,
OP_StateWindow
=
22
,
OP_AllTimeWindow
=
23
,
OP_AllMultiTableTimeInterval
=
24
,
};
typedef
struct
SOperatorInfo
{
...
...
@@ -549,11 +552,13 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo
*
createProjectOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
);
SOperatorInfo
*
createTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSWindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createFillOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createGroupbyOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTagScanOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createDistinctOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
);
...
...
src/query/src/qAggMain.c
浏览文件 @
b6efe1c4
...
...
@@ -3708,27 +3708,59 @@ static void interp_function_impl(SQLFunctionCtx *pCtx) {
}
}
else
{
// no data generated yet
if
(
pCtx
->
size
==
1
)
{
if
(
pCtx
->
size
<
1
)
{
return
;
}
// check the timestamp in input buffer
TSKEY
skey
=
GET_TS_DATA
(
pCtx
,
0
);
TSKEY
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
// no data generated yet
if
(
!
(
skey
<
pCtx
->
startTs
&&
ekey
>
pCtx
->
startTs
))
{
return
;
}
assert
(
pCtx
->
start
.
key
==
INT64_MIN
&&
skey
<
pCtx
->
startTs
&&
ekey
>
pCtx
->
startTs
);
if
(
type
==
TSDB_FILL_PREV
)
{
if
(
skey
>
pCtx
->
startTs
)
{
return
;
}
if
(
pCtx
->
size
>
1
)
{
TSKEY
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
if
(
ekey
>
skey
&&
ekey
<=
pCtx
->
startTs
)
{
skey
=
ekey
;
}
}
assignVal
(
pCtx
->
pOutput
,
pCtx
->
pInput
,
pCtx
->
outputBytes
,
pCtx
->
inputType
);
}
else
if
(
type
==
TSDB_FILL_NEXT
)
{
char
*
val
=
((
char
*
)
pCtx
->
pInput
)
+
pCtx
->
inputBytes
;
TSKEY
ekey
=
skey
;
char
*
val
=
NULL
;
if
(
ekey
<
pCtx
->
startTs
)
{
if
(
pCtx
->
size
>
1
)
{
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
if
(
ekey
<
pCtx
->
startTs
)
{
return
;
}
val
=
((
char
*
)
pCtx
->
pInput
)
+
pCtx
->
inputBytes
;
}
else
{
return
;
}
}
else
{
val
=
(
char
*
)
pCtx
->
pInput
;
}
assignVal
(
pCtx
->
pOutput
,
val
,
pCtx
->
outputBytes
,
pCtx
->
inputType
);
}
else
if
(
type
==
TSDB_FILL_LINEAR
)
{
if
(
pCtx
->
size
<=
1
)
{
return
;
}
TSKEY
ekey
=
GET_TS_DATA
(
pCtx
,
1
);
// no data generated yet
if
(
!
(
skey
<
pCtx
->
startTs
&&
ekey
>
pCtx
->
startTs
))
{
return
;
}
assert
(
pCtx
->
start
.
key
==
INT64_MIN
&&
skey
<
pCtx
->
startTs
&&
ekey
>
pCtx
->
startTs
);
char
*
start
=
GET_INPUT_DATA
(
pCtx
,
0
);
char
*
end
=
GET_INPUT_DATA
(
pCtx
,
1
);
...
...
src/query/src/qExecutor.c
浏览文件 @
b6efe1c4
...
...
@@ -448,6 +448,44 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
pResultRowInfo
->
capacity
=
(
int32_t
)
newCapacity
;
}
static
bool
chkResultRowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
uid
)
{
bool
existed
=
false
;
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
pData
,
bytes
,
uid
);
SResultRow
**
p1
=
(
SResultRow
**
)
taosHashGet
(
pRuntimeEnv
->
pResultRowHashTable
,
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
// in case of repeat scan/reverse scan, no new time window added.
if
(
QUERY_IS_INTERVAL_QUERY
(
pRuntimeEnv
->
pQueryAttr
))
{
if
(
!
masterscan
)
{
// the *p1 may be NULL in case of sliding+offset exists.
return
p1
!=
NULL
;
}
if
(
p1
!=
NULL
)
{
if
(
pResultRowInfo
->
size
==
0
)
{
existed
=
false
;
assert
(
pResultRowInfo
->
curPos
==
-
1
);
}
else
if
(
pResultRowInfo
->
size
==
1
)
{
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
));
}
else
{
// check if current pResultRowInfo contains the existed pResultRow
SET_RES_EXT_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
pData
,
bytes
,
uid
,
pResultRowInfo
);
int64_t
*
index
=
taosHashGet
(
pRuntimeEnv
->
pResultRowListSet
,
pRuntimeEnv
->
keyBuf
,
GET_RES_EXT_WINDOW_KEY_LEN
(
bytes
));
if
(
index
!=
NULL
)
{
existed
=
true
;
}
else
{
existed
=
false
;
}
}
}
return
existed
;
}
return
p1
!=
NULL
;
}
static
SResultRow
*
doSetResultOutBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
tid
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
tableGroupId
)
{
bool
existed
=
false
;
...
...
@@ -592,6 +630,43 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
return
w
;
}
// get the correct time window according to the handled timestamp
static
STimeWindow
getCurrentActiveTimeWindow
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SQueryAttr
*
pQuery
)
{
STimeWindow
w
=
{
0
};
#if 0
if (pResultRowInfo->curIndex == -1) { // the first window, from the previous stored value
if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
getInitialStartTimeWindow(pQuery, ts, &w);
pResultRowInfo->prevSKey = w.skey;
} else {
w.skey = pResultRowInfo->prevSKey;
}
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
} else {
w.ekey = w.skey + pQuery->interval.interval - 1;
}
} else {
int32_t slot = curTimeWindowIndex(pResultRowInfo);
SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot);
w = pWindowRes->win;
}
/*
* query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly.
*/
if (w.ekey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) {
w.ekey = pQuery->window.ekey;
}
#endif
return
w
;
}
// a new buffer page for each table. Needs to opt this design
static
int32_t
addNewWindowResultBuf
(
SResultRow
*
pWindowRes
,
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
tid
,
uint32_t
size
)
{
if
(
pWindowRes
->
pageId
!=
-
1
)
{
...
...
@@ -637,6 +712,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return
0
;
}
static
bool
chkWindowOutputBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
STimeWindow
*
win
,
bool
masterscan
,
SResultRow
**
pResult
,
int64_t
groupId
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
)
{
assert
(
win
->
skey
<=
win
->
ekey
);
return
chkResultRowFromKey
(
pRuntimeEnv
,
pResultRowInfo
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
masterscan
,
groupId
);
}
static
int32_t
setResultOutputBufByKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
tid
,
STimeWindow
*
win
,
bool
masterscan
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
)
{
...
...
@@ -707,7 +790,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
}
}
assert
(
forwardStep
>
0
);
assert
(
forwardStep
>
=
0
);
return
forwardStep
;
}
...
...
@@ -764,6 +847,8 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
pResultRowInfo
->
curPos
=
i
+
1
;
// current not closed result object
}
}
//pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
}
static
void
updateResultRowInfoActiveIndex
(
SResultRowInfo
*
pResultRowInfo
,
SQueryAttr
*
pQueryAttr
,
TSKEY
lastKey
)
{
...
...
@@ -813,7 +898,7 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
}
}
assert
(
num
>
0
);
assert
(
num
>
=
0
);
return
num
;
}
...
...
@@ -973,6 +1058,11 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext
}
}
/* interp query with fill should not skip time window */
if
(
pQueryAttr
->
interpQuery
&&
pQueryAttr
->
fillType
!=
TSDB_FILL_NONE
)
{
return
startPos
;
}
/*
* This time window does not cover any data, try next time window,
* this case may happen when the time window is too small
...
...
@@ -1485,6 +1575,85 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
updateResultRowInfoActiveIndex
(
pResultRowInfo
,
pQueryAttr
,
pRuntimeEnv
->
current
->
lastKey
);
}
static
void
hashAllIntervalAgg
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pSDataBlock
,
int32_t
groupId
)
{
(
void
)
getCurrentActiveTimeWindow
;
#if 0
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQueryAttr* pQuery = pRuntimeEnv->pQueryAttr;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = (int64_t*) pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
}
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQuery);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL;
int32_t forwardStep = 0;
while (1) {
// null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQuery, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
if (startPos < 0) {
if (win.skey <= pQuery->window.ekey) {
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
startPos = pSDataBlock->info.rows - 1;
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
break;
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
if (pQuery->timeWindowInterpo) {
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0;
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
}
updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pQuery->current->lastKey);
#endif
}
static
void
doHashGroupbyAgg
(
SOperatorInfo
*
pOperator
,
SGroupbyOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
STableQueryInfo
*
item
=
pRuntimeEnv
->
current
;
...
...
@@ -2918,6 +3087,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// check if this data block is required to load
if
((
*
status
)
!=
BLK_DATA_ALL_NEEDED
)
{
bool
needFilter
=
true
;
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if
(
QUERY_IS_INTERVAL_QUERY
(
pQueryAttr
))
{
...
...
@@ -2927,10 +3098,16 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
TSKEY
k
=
ascQuery
?
pBlock
->
info
.
window
.
skey
:
pBlock
->
info
.
window
.
ekey
;
STimeWindow
win
=
getActiveTimeWindow
(
pTableScanInfo
->
pResultRowInfo
,
k
,
pQueryAttr
);
if
(
setResultOutputBufByKey
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
pBlock
->
info
.
tid
,
&
win
,
masterScan
,
&
pResult
,
groupId
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
,
pTableScanInfo
->
rowCellInfoOffset
)
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
if
(
pQueryAttr
->
interpQuery
)
{
needFilter
=
chkWindowOutputBufByKey
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
&
win
,
masterScan
,
&
pResult
,
groupId
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
,
pTableScanInfo
->
rowCellInfoOffset
);
}
else
{
if
(
setResultOutputBufByKey
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
pBlock
->
info
.
tid
,
&
win
,
masterScan
,
&
pResult
,
groupId
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
,
pTableScanInfo
->
rowCellInfoOffset
)
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
else
if
(
pQueryAttr
->
stableQuery
&&
(
!
pQueryAttr
->
tsCompQuery
)
&&
(
!
pQueryAttr
->
diffQuery
))
{
// stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
pTableScanInfo
->
pCtx
,
...
...
@@ -2938,7 +3115,11 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pRuntimeEnv
->
current
->
groupIndex
);
}
(
*
status
)
=
doFilterByBlockTimeWindow
(
pTableScanInfo
,
pBlock
);
if
(
needFilter
)
{
(
*
status
)
=
doFilterByBlockTimeWindow
(
pTableScanInfo
,
pBlock
);
}
else
{
(
*
status
)
=
BLK_DATA_ALL_NEEDED
;
}
}
SDataBlockInfo
*
pBlockInfo
=
&
pBlock
->
info
;
...
...
@@ -4546,6 +4727,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
SQueryAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
pQueryAttr
->
tsdb
=
tsdb
;
if
(
tsdb
!=
NULL
)
{
int32_t
code
=
setupQueryHandle
(
tsdb
,
pRuntimeEnv
,
pQInfo
->
qId
,
pQueryAttr
->
stableQuery
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4954,7 +5136,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo
->
pCtx
=
pAggInfo
->
binfo
.
pCtx
;
pTableScanInfo
->
pResultRowInfo
=
&
pAggInfo
->
binfo
.
resultRowInfo
;
pTableScanInfo
->
rowCellInfoOffset
=
pAggInfo
->
binfo
.
rowCellInfoOffset
;
}
else
if
(
pDownstream
->
operatorType
==
OP_TimeWindow
)
{
}
else
if
(
pDownstream
->
operatorType
==
OP_TimeWindow
||
pDownstream
->
operatorType
==
OP_AllTimeWindow
)
{
STableIntervalOperatorInfo
*
pIntervalInfo
=
pDownstream
->
info
;
pTableScanInfo
->
pCtx
=
pIntervalInfo
->
pCtx
;
...
...
@@ -4968,7 +5150,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo
->
pResultRowInfo
=
&
pGroupbyInfo
->
binfo
.
resultRowInfo
;
pTableScanInfo
->
rowCellInfoOffset
=
pGroupbyInfo
->
binfo
.
rowCellInfoOffset
;
}
else
if
(
pDownstream
->
operatorType
==
OP_MultiTableTimeInterval
)
{
}
else
if
(
pDownstream
->
operatorType
==
OP_MultiTableTimeInterval
||
pDownstream
->
operatorType
==
OP_AllMultiTableTimeInterval
)
{
STableIntervalOperatorInfo
*
pInfo
=
pDownstream
->
info
;
pTableScanInfo
->
pCtx
=
pInfo
->
pCtx
;
...
...
@@ -5642,6 +5824,63 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return
pIntervalInfo
->
pRes
;
}
static
SSDataBlock
*
doAllSTableIntervalAgg
(
void
*
param
,
bool
*
newgroup
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
STableIntervalOperatorInfo
*
pIntervalInfo
=
pOperator
->
info
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
copyToSDataBlock
(
pRuntimeEnv
,
3000
,
pIntervalInfo
->
pRes
,
pIntervalInfo
->
rowCellInfoOffset
);
if
(
pIntervalInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
pIntervalInfo
->
pRes
;
}
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
int32_t
order
=
pQueryAttr
->
order
.
order
;
SOperatorInfo
*
upstream
=
pOperator
->
upstream
[
0
];
while
(
1
)
{
publishOperatorProfEvent
(
upstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
upstream
->
exec
(
upstream
,
newgroup
);
publishOperatorProfEvent
(
upstream
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
break
;
}
// the pDataBlock are always the same one, no need to call this again
STableQueryInfo
*
pTableQueryInfo
=
pRuntimeEnv
->
current
;
setTagValue
(
pOperator
,
pTableQueryInfo
->
pTable
,
pIntervalInfo
->
pCtx
,
pOperator
->
numOfOutput
);
setInputDataBlock
(
pOperator
,
pIntervalInfo
->
pCtx
,
pBlock
,
pQueryAttr
->
order
.
order
);
setIntervalQueryRange
(
pRuntimeEnv
,
pBlock
->
info
.
window
.
skey
);
hashAllIntervalAgg
(
pOperator
,
&
pTableQueryInfo
->
resInfo
,
pBlock
,
pTableQueryInfo
->
groupIndex
);
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
pQueryAttr
->
order
.
order
=
order
;
// TODO : restore the order
doCloseAllTimeWindow
(
pRuntimeEnv
);
setQueryStatus
(
pRuntimeEnv
,
QUERY_COMPLETED
);
copyToSDataBlock
(
pRuntimeEnv
,
3000
,
pIntervalInfo
->
pRes
,
pIntervalInfo
->
rowCellInfoOffset
);
if
(
pIntervalInfo
->
pRes
->
info
.
rows
==
0
||
!
hasRemainData
(
&
pRuntimeEnv
->
groupResInfo
))
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
pIntervalInfo
->
pRes
;
}
static
void
doStateWindowAggImpl
(
SOperatorInfo
*
pOperator
,
SStateWindowOperatorInfo
*
pInfo
,
SSDataBlock
*
pSDataBlock
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
...
...
@@ -6285,8 +6524,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
}
SOperatorInfo
*
createSWindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SSWindowOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SSWindowOperatorInfo
));
...
...
@@ -6337,6 +6576,32 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
return
pOperator
;
}
SOperatorInfo
*
createAllMultiTableTimeIntervalOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
STableIntervalOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableIntervalOperatorInfo
));
pInfo
->
pCtx
=
createSQLFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"AllMultiTableTimeIntervalOperator"
;
pOperator
->
operatorType
=
OP_AllMultiTableTimeInterval
;
pOperator
->
blockingOptr
=
true
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
exec
=
doAllSTableIntervalAgg
;
pOperator
->
cleanup
=
destroyBasicOperatorInfo
;
appendUpstream
(
pOperator
,
upstream
);
return
pOperator
;
}
SOperatorInfo
*
createGroupbyOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SGroupbyOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
pInfo
->
colIndex
=
-
1
;
// group by column index
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
b6efe1c4
...
...
@@ -2693,7 +2693,7 @@ static void destroyHelper(void* param) {
free
(
param
);
}
static
bool
loadBlockOfActiveTable
(
STsdbQueryHandle
*
pQueryHandle
)
{
static
bool
loadBlockOfActiveTable
(
STsdbQueryHandle
*
pQueryHandle
)
{
if
(
pQueryHandle
->
checkFiles
)
{
// check if the query range overlaps with the file data block
bool
exists
=
true
;
...
...
tests/pytest/fulltest.sh
浏览文件 @
b6efe1c4
...
...
@@ -169,7 +169,7 @@ python3 test.py -f tools/taosdemoTestQuery.py
# nano support
python3 test.py
-f
tools/taosdemoAllTest/taosdemoTestSupportNanoInsert.py
python3 test.py
-f
tools/taosdemoAllTest/taosdemoTestSupportNanoQuery.py
python3 test.py
-f
tools/taosdemoAllTest/taosdemoTestSupportNanosubscribe.py
#
python3 test.py -f tools/taosdemoAllTest/taosdemoTestSupportNanosubscribe.py
python3 test.py
-f
tools/taosdemoAllTest/taosdemoTestInsertTime_step.py
python3 test.py
-f
tools/taosdumpTestNanoSupport.py
...
...
tests/script/general/parser/interp.sim
浏览文件 @
b6efe1c4
...
...
@@ -55,6 +55,9 @@ while $i < $halfNum
endw
print ====== tables created
sql create table ap1 (ts timestamp, pav float);
sql INSERT INTO ap1 VALUES ('2021-07-25 02:19:54.100',1) ('2021-07-25 02:19:54.200',2) ('2021-07-25 02:19:54.300',3) ('2021-07-25 02:19:56.500',4) ('2021-07-25 02:19:57.500',5) ('2021-07-25 02:19:57.600',6) ('2021-07-25 02:19:57.900',7) ('2021-07-25 02:19:58.100',8) ('2021-07-25 02:19:58.300',9) ('2021-07-25 02:19:59.100',10) ('2021-07-25 02:19:59.300',11) ('2021-07-25 02:19:59.500',12) ('2021-07-25 02:19:59.700',13) ('2021-07-25 02:19:59.900',14) ('2021-07-25 02:20:05.000', 20) ('2021-07-25 02:25:00.000', 10000);
run general/parser/interp_test.sim
print ================== restart server to commit data into disk
...
...
@@ -65,4 +68,4 @@ print ================== server restart completed
run general/parser/interp_test.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/general/parser/interp_test.sim
浏览文件 @
b6efe1c4
此差异已折叠。
点击以展开。
tests/script/jenkins/basic.txt
浏览文件 @
b6efe1c4
...
...
@@ -74,7 +74,7 @@ cd ../../../debug; make
./test.sh -f general/parser/where.sim
./test.sh -f general/parser/slimit.sim
./test.sh -f general/parser/select_with_tags.sim
./test.sh -f general/parser/interp.sim
#
./test.sh -f general/parser/interp.sim
./test.sh -f general/parser/tags_dynamically_specifiy.sim
./test.sh -f general/parser/groupby.sim
./test.sh -f general/parser/set_tag_vals.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录