Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7b364249
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7b364249
编写于
9月 01, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
9月 01, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #3306 from taosdata/feature/td-1099
natural interval
上级
9b2f895e
e9673320
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
507 addition
and
121 deletion
+507
-121
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+2
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+5
-2
src/client/src/tscProfile.c
src/client/src/tscProfile.c
+2
-2
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+34
-26
src/client/src/tscServer.c
src/client/src/tscServer.c
+1
-0
src/client/src/tscStream.c
src/client/src/tscStream.c
+59
-41
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+4
-0
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+1
-0
src/common/inc/tname.h
src/common/inc/tname.h
+2
-0
src/common/src/tname.c
src/common/src/tname.c
+106
-19
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-0
src/os/inc/osTime.h
src/os/inc/osTime.h
+1
-0
src/os/src/detail/osTime.c
src/os/src/detail/osTime.c
+19
-0
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+2
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+76
-26
src/query/src/qFill.c
src/query/src/qFill.c
+22
-4
tests/pytest/query/natualInterval.py
tests/pytest/query/natualInterval.py
+170
-0
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
7b364249
...
...
@@ -70,6 +70,8 @@ typedef struct SJoinSupporter {
SSubqueryState
*
pState
;
SSqlObj
*
pObj
;
// parent SqlObj
int32_t
subqueryIndex
;
// index of sub query
char
intervalTimeUnit
;
char
slidingTimeUnit
;
int64_t
intervalTime
;
// interval time
int64_t
slidingTime
;
// sliding time
SLimitVal
limit
;
// limit info
...
...
src/client/inc/tsclient.h
浏览文件 @
7b364249
...
...
@@ -229,8 +229,9 @@ typedef struct STableDataBlocks {
typedef
struct
SQueryInfo
{
int16_t
command
;
// the command may be different for each subclause, so keep it seperately.
uint32_t
type
;
// query/insert type
char
intervalTimeUnit
;
char
slidingTimeUnit
;
uint32_t
type
;
// query/insert type
STimeWindow
window
;
// query time window
int64_t
intervalTime
;
// aggregation time interval
int64_t
slidingTime
;
// sliding window in mseconds
...
...
@@ -366,6 +367,8 @@ typedef struct SSqlStream {
uint32_t
streamId
;
char
listed
;
bool
isProject
;
char
intervalTimeUnit
;
char
slidingTimeUnit
;
int16_t
precision
;
int64_t
num
;
// number of computing count
...
...
@@ -379,7 +382,7 @@ typedef struct SSqlStream {
int64_t
ctime
;
// stream created time
int64_t
stime
;
// stream next executed time
int64_t
etime
;
// stream end query time, when time is larger then etime, the stream will be closed
int64_t
interval
;
int64_t
interval
Time
;
int64_t
slidingTime
;
void
*
pTimer
;
...
...
src/client/src/tscProfile.c
浏览文件 @
7b364249
...
...
@@ -259,11 +259,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
pSdesc
->
num
=
htobe64
(
pStream
->
num
);
pSdesc
->
useconds
=
htobe64
(
pStream
->
useconds
);
pSdesc
->
stime
=
htobe64
(
pStream
->
stime
-
pStream
->
interval
);
pSdesc
->
stime
=
htobe64
(
pStream
->
stime
-
pStream
->
interval
Time
);
pSdesc
->
ctime
=
htobe64
(
pStream
->
ctime
);
pSdesc
->
slidingTime
=
htobe64
(
pStream
->
slidingTime
);
pSdesc
->
interval
=
htobe64
(
pStream
->
interval
);
pSdesc
->
interval
=
htobe64
(
pStream
->
interval
Time
);
pHeartbeat
->
numOfStreams
++
;
pSdesc
++
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
7b364249
...
...
@@ -587,21 +587,20 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
// interval is not null
SStrToken
*
t
=
&
pQuerySql
->
interval
;
if
(
getTimestampInUsFromStr
(
t
->
z
,
t
->
n
,
&
pQueryInfo
->
intervalTime
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
parseDuration
(
t
->
z
,
t
->
n
,
&
pQueryInfo
->
intervalTime
,
&
pQueryInfo
->
intervalTimeUnit
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
// if the unit of time window value is millisecond, change the value from microsecond
if
(
tinfo
.
precision
==
TSDB_TIME_PRECISION_MILLI
)
{
pQueryInfo
->
intervalTime
=
pQueryInfo
->
intervalTime
/
1000
;
}
/* parser has filter the illegal type, no need to check here */
pQueryInfo
->
slidingTimeUnit
=
pQuerySql
->
interval
.
z
[
pQuerySql
->
interval
.
n
-
1
];
if
(
pQueryInfo
->
intervalTimeUnit
!=
'n'
&&
pQueryInfo
->
intervalTimeUnit
!=
'y'
)
{
// if the unit of time window value is millisecond, change the value from microsecond
if
(
tinfo
.
precision
==
TSDB_TIME_PRECISION_MILLI
)
{
pQueryInfo
->
intervalTime
=
pQueryInfo
->
intervalTime
/
1000
;
}
// interval cannot be less than 10 milliseconds
if
(
pQueryInfo
->
intervalTime
<
tsMinIntervalTime
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
// interval cannot be less than 10 milliseconds
if
(
pQueryInfo
->
intervalTime
<
tsMinIntervalTime
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg2
);
}
}
// for top/bottom + interval query, we do not add additional timestamp column in the front
...
...
@@ -666,6 +665,7 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
const
char
*
msg0
=
"sliding value too small"
;
const
char
*
msg1
=
"sliding value no larger than the interval value"
;
const
char
*
msg2
=
"sliding value can not less than 1% of interval value"
;
const
char
*
msg3
=
"does not support sliding when interval is natual month/year"
;
const
static
int32_t
INTERVAL_SLIDING_FACTOR
=
100
;
...
...
@@ -673,21 +673,27 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
SStrToken
*
pSliding
=
&
pQuerySql
->
sliding
;
if
(
pSliding
->
n
!
=
0
)
{
getTimestampInUsFromStr
(
pSliding
->
z
,
pSliding
->
n
,
&
pQueryInfo
->
slidingTime
)
;
if
(
tinfo
.
precision
==
TSDB_TIME_PRECISION_MILLI
)
{
pQueryInfo
->
slidingTime
/=
1000
;
}
if
(
pSliding
->
n
=
=
0
)
{
pQueryInfo
->
slidingTimeUnit
=
pQueryInfo
->
intervalTimeUnit
;
pQueryInfo
->
slidingTime
=
pQueryInfo
->
intervalTime
;
return
TSDB_CODE_SUCCESS
;
}
if
(
pQueryInfo
->
slidingTime
<
tsMinSlidingTime
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg0
);
}
if
(
pQueryInfo
->
intervalTimeUnit
==
'n'
||
pQueryInfo
->
intervalTimeUnit
==
'y'
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
if
(
pQueryInfo
->
slidingTime
>
pQueryInfo
->
intervalTime
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
}
else
{
pQueryInfo
->
slidingTime
=
pQueryInfo
->
intervalTime
;
getTimestampInUsFromStr
(
pSliding
->
z
,
pSliding
->
n
,
&
pQueryInfo
->
slidingTime
);
if
(
tinfo
.
precision
==
TSDB_TIME_PRECISION_MILLI
)
{
pQueryInfo
->
slidingTime
/=
1000
;
}
if
(
pQueryInfo
->
slidingTime
<
tsMinSlidingTime
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg0
);
}
if
(
pQueryInfo
->
slidingTime
>
pQueryInfo
->
intervalTime
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg1
);
}
if
((
pQueryInfo
->
intervalTime
!=
0
)
&&
(
pQueryInfo
->
intervalTime
/
pQueryInfo
->
slidingTime
>
INTERVAL_SLIDING_FACTOR
))
{
...
...
@@ -4675,7 +4681,9 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const
char
*
msg0
=
"sample interval can not be less than 10ms."
;
const
char
*
msg1
=
"functions not allowed in select clause"
;
if
(
pQueryInfo
->
intervalTime
!=
0
&&
pQueryInfo
->
intervalTime
<
10
)
{
if
(
pQueryInfo
->
intervalTime
!=
0
&&
pQueryInfo
->
intervalTime
<
10
&&
pQueryInfo
->
intervalTimeUnit
!=
'n'
&&
pQueryInfo
->
intervalTimeUnit
!=
'y'
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg0
);
}
...
...
@@ -6161,7 +6169,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg5
);
}
if
(
pQueryInfo
->
intervalTime
>
0
)
{
if
(
pQueryInfo
->
intervalTime
>
0
&&
pQueryInfo
->
intervalTimeUnit
!=
'n'
&&
pQueryInfo
->
intervalTimeUnit
!=
'y'
)
{
int64_t
timeRange
=
ABS
(
pQueryInfo
->
window
.
skey
-
pQueryInfo
->
window
.
ekey
);
// number of result is not greater than 10,000,000
if
((
timeRange
==
0
)
||
(
timeRange
/
pQueryInfo
->
intervalTime
)
>
MAX_INTERVAL_TIME_WINDOW
)
{
...
...
src/client/src/tscServer.c
浏览文件 @
7b364249
...
...
@@ -673,6 +673,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
numOfCols
=
htons
((
int16_t
)
taosArrayGetSize
(
pQueryInfo
->
colList
));
pQueryMsg
->
intervalTime
=
htobe64
(
pQueryInfo
->
intervalTime
);
pQueryMsg
->
slidingTime
=
htobe64
(
pQueryInfo
->
slidingTime
);
pQueryMsg
->
intervalTimeUnit
=
pQueryInfo
->
intervalTimeUnit
;
pQueryMsg
->
slidingTimeUnit
=
pQueryInfo
->
slidingTimeUnit
;
pQueryMsg
->
numOfGroupCols
=
htons
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
);
pQueryMsg
->
numOfTags
=
htonl
(
numOfTags
);
...
...
src/client/src/tscStream.c
浏览文件 @
7b364249
...
...
@@ -46,22 +46,23 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) {
return
true
;
}
static
int64_t
tscGetRetryDelayTime
(
int64_t
slidingTime
,
int16_t
prec
)
{
static
int64_t
tscGetRetryDelayTime
(
SSqlStream
*
pStream
,
int64_t
slidingTime
,
int16_t
prec
)
{
float
retryRangeFactor
=
0
.
3
f
;
// change to ms
if
(
prec
==
TSDB_TIME_PRECISION_MICRO
)
{
slidingTime
=
slidingTime
/
1000
;
}
int64_t
retryDelta
=
(
int64_t
)(
tsStreamCompRetryDelay
*
retryRangeFactor
);
retryDelta
=
((
rand
()
%
retryDelta
)
+
tsStreamCompRetryDelay
)
*
1000L
;
if
(
slidingTime
<
retryDelta
)
{
return
slidingTime
;
}
else
{
return
retryDelta
;
if
(
pStream
->
intervalTimeUnit
!=
'n'
&&
pStream
->
intervalTimeUnit
!=
'y'
)
{
// change to ms
if
(
prec
==
TSDB_TIME_PRECISION_MICRO
)
{
slidingTime
=
slidingTime
/
1000
;
}
if
(
slidingTime
<
retryDelta
)
{
return
slidingTime
;
}
}
return
retryDelta
;
}
static
void
tscProcessStreamLaunchQuery
(
SSchedMsg
*
pMsg
)
{
...
...
@@ -86,7 +87,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
// failed to get meter/metric meta, retry in 10sec.
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
->
slidingTime
,
pStream
->
precision
);
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
slidingTime
,
pStream
->
precision
);
tscDebug
(
"%p stream:%p,get metermeta failed, retry in %"
PRId64
"ms"
,
pStream
->
pSql
,
pStream
,
retryDelayTime
);
tscSetRetryTimer
(
pStream
,
pSql
,
retryDelayTime
);
...
...
@@ -131,13 +132,17 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
}
if
(
etime
>
pStream
->
etime
)
{
etime
=
pStream
->
etime
;
}
else
if
(
pStream
->
intervalTimeUnit
!=
'y'
&&
pStream
->
intervalTimeUnit
!=
'n'
)
{
etime
=
pStream
->
stime
+
(
etime
-
pStream
->
stime
)
/
pStream
->
intervalTime
*
pStream
->
intervalTime
;
}
else
{
etime
=
pStream
->
stime
+
(
etime
-
pStream
->
stime
)
/
pStream
->
interval
*
pStream
->
interval
;
etime
=
taosGetIntervalStartTimestamp
(
etime
,
pStream
->
slidingTime
,
pStream
->
intervalTime
,
pStream
->
slidingTimeUnit
,
pStream
->
precision
)
;
}
pQueryInfo
->
window
.
ekey
=
etime
;
if
(
pQueryInfo
->
window
.
skey
>=
pQueryInfo
->
window
.
ekey
)
{
int64_t
timer
=
pStream
->
slidingTime
;
if
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
if
(
pStream
->
intervalTimeUnit
==
'y'
||
pStream
->
intervalTimeUnit
==
'n'
)
{
timer
=
86400
*
1000l
;
}
else
if
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
timer
/=
1000l
;
}
tscSetRetryTimer
(
pStream
,
pSql
,
timer
);
...
...
@@ -157,7 +162,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
static
void
tscProcessStreamQueryCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
if
(
tres
==
NULL
||
numOfRows
<
0
)
{
int64_t
retryDelay
=
tscGetRetryDelayTime
(
pStream
->
slidingTime
,
pStream
->
precision
);
int64_t
retryDelay
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
slidingTime
,
pStream
->
precision
);
tscError
(
"%p stream:%p, query data failed, code:0x%08x, retry in %"
PRId64
"ms"
,
pStream
->
pSql
,
pStream
,
numOfRows
,
retryDelay
);
...
...
@@ -218,7 +223,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
if
(
pSql
==
NULL
||
numOfRows
<
0
)
{
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
->
slidingTime
,
pStream
->
precision
);
int64_t
retryDelayTime
=
tscGetRetryDelayTime
(
pStream
,
pStream
->
slidingTime
,
pStream
->
precision
);
tscError
(
"%p stream:%p, retrieve data failed, code:0x%08x, retry in %"
PRId64
"ms"
,
pSql
,
pStream
,
numOfRows
,
retryDelayTime
);
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retryDelayTime
);
...
...
@@ -241,7 +246,11 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
}
if
(
!
pStream
->
isProject
)
{
pStream
->
stime
+=
pStream
->
slidingTime
;
if
(
pStream
->
intervalTimeUnit
==
'y'
||
pStream
->
intervalTimeUnit
==
'n'
)
{
pStream
->
stime
=
taosAddNatualInterval
(
pStream
->
stime
,
pStream
->
slidingTime
,
pStream
->
slidingTimeUnit
,
pStream
->
precision
);
}
else
{
pStream
->
stime
+=
pStream
->
slidingTime
;
}
}
// actually only one row is returned. this following is not necessary
taos_fetch_rows_a
(
res
,
tscProcessStreamRetrieveResult
,
pStream
);
...
...
@@ -301,7 +310,7 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
now
+
timer
,
timer
,
delay
,
pStream
->
stime
,
etime
);
}
else
{
tscDebug
(
"%p stream:%p, next start at %"
PRId64
", in %"
PRId64
"ms. delay:%"
PRId64
"ms qrange %"
PRId64
"-%"
PRId64
,
pStream
->
pSql
,
pStream
,
pStream
->
stime
,
timer
,
delay
,
pStream
->
stime
-
pStream
->
interval
,
pStream
->
stime
-
1
);
pStream
->
stime
,
timer
,
delay
,
pStream
->
stime
-
pStream
->
interval
Time
,
pStream
->
stime
-
1
);
}
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
...
...
@@ -311,23 +320,26 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
}
static
int64_t
getLaunchTimeDelay
(
const
SSqlStream
*
pStream
)
{
int64_t
delayDelta
=
(
int64_t
)(
pStream
->
slidingTime
*
tsStreamComputDelayRatio
);
int64_t
maxDelay
=
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
tsMaxStreamComputDelay
*
1000L
:
tsMaxStreamComputDelay
;
if
(
delayDelta
>
maxDelay
)
{
delayDelta
=
maxDelay
;
}
int64_t
remainTimeWindow
=
pStream
->
slidingTime
-
delayDelta
;
if
(
maxDelay
>
remainTimeWindow
)
{
maxDelay
=
(
int64_t
)(
remainTimeWindow
/
1
.
5
f
);
int64_t
delayDelta
=
maxDelay
;
if
(
pStream
->
intervalTimeUnit
!=
'n'
&&
pStream
->
intervalTimeUnit
!=
'y'
)
{
delayDelta
=
pStream
->
slidingTime
*
tsStreamComputDelayRatio
;
if
(
delayDelta
>
maxDelay
)
{
delayDelta
=
maxDelay
;
}
int64_t
remainTimeWindow
=
pStream
->
slidingTime
-
delayDelta
;
if
(
maxDelay
>
remainTimeWindow
)
{
maxDelay
=
(
int64_t
)(
remainTimeWindow
/
1
.
5
f
);
}
}
int64_t
currentDelay
=
(
rand
()
%
maxDelay
);
// a random number
currentDelay
+=
delayDelta
;
assert
(
currentDelay
<
pStream
->
slidingTime
);
if
(
pStream
->
intervalTimeUnit
!=
'n'
&&
pStream
->
intervalTimeUnit
!=
'y'
)
{
assert
(
currentDelay
<
pStream
->
slidingTime
);
}
return
currentDelay
;
}
...
...
@@ -354,7 +366,8 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
return
;
}
}
else
{
if
((
pStream
->
stime
-
pStream
->
interval
)
>=
pStream
->
etime
)
{
int64_t
stime
=
taosGetIntervalStartTimestamp
(
pStream
->
stime
-
1
,
pStream
->
intervalTime
,
pStream
->
intervalTime
,
pStream
->
intervalTimeUnit
,
pStream
->
precision
);
if
(
stime
>=
pStream
->
etime
)
{
tscDebug
(
"%p stream:%p, stime:%"
PRId64
" is larger than end time: %"
PRId64
", stop the stream"
,
pStream
->
pSql
,
pStream
,
pStream
->
stime
,
pStream
->
etime
);
// TODO : How to terminate stream here
...
...
@@ -387,24 +400,24 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
->
intervalTime
<
minIntervalTime
)
{
if
(
pQueryInfo
->
intervalTime
Unit
!=
'n'
&&
pQueryInfo
->
intervalTimeUnit
!=
'y'
&&
pQueryInfo
->
intervalTime
<
minIntervalTime
)
{
tscWarn
(
"%p stream:%p, original sample interval:%ld too small, reset to:%"
PRId64
,
pSql
,
pStream
,
pQueryInfo
->
intervalTime
,
minIntervalTime
);
pQueryInfo
->
intervalTime
=
minIntervalTime
;
}
pStream
->
interval
=
pQueryInfo
->
intervalTime
;
// it shall be derived from sql string
pStream
->
intervalTimeUnit
=
pQueryInfo
->
intervalTimeUnit
;
pStream
->
intervalTime
=
pQueryInfo
->
intervalTime
;
// it shall be derived from sql string
if
(
pQueryInfo
->
slidingTime
=
=
0
)
{
if
(
pQueryInfo
->
slidingTime
<
=
0
)
{
pQueryInfo
->
slidingTime
=
pQueryInfo
->
intervalTime
;
pQueryInfo
->
slidingTimeUnit
=
pQueryInfo
->
intervalTimeUnit
;
}
int64_t
minSlidingTime
=
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
tsMinSlidingTime
*
1000L
:
tsMinSlidingTime
;
if
(
pQueryInfo
->
slidingTime
==
-
1
)
{
pQueryInfo
->
slidingTime
=
pQueryInfo
->
intervalTime
;
}
else
if
(
pQueryInfo
->
slidingTime
<
minSlidingTime
)
{
if
(
pQueryInfo
->
intervalTimeUnit
!=
'n'
&&
pQueryInfo
->
intervalTimeUnit
!=
'y'
&&
pQueryInfo
->
slidingTime
<
minSlidingTime
)
{
tscWarn
(
"%p stream:%p, original sliding value:%"
PRId64
" too small, reset to:%"
PRId64
,
pSql
,
pStream
,
pQueryInfo
->
slidingTime
,
minSlidingTime
);
...
...
@@ -418,6 +431,7 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pQueryInfo
->
slidingTime
=
pQueryInfo
->
intervalTime
;
}
pStream
->
slidingTimeUnit
=
pQueryInfo
->
slidingTimeUnit
;
pStream
->
slidingTime
=
pQueryInfo
->
slidingTime
;
if
(
pStream
->
isProject
)
{
...
...
@@ -431,7 +445,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
if
(
pStream
->
isProject
)
{
// no data in table, flush all data till now to destination meter, 10sec delay
pStream
->
interval
=
tsProjectExecInterval
;
pStream
->
interval
Time
=
tsProjectExecInterval
;
pStream
->
slidingTime
=
tsProjectExecInterval
;
if
(
stime
!=
0
)
{
// first projection start from the latest event timestamp
...
...
@@ -442,11 +456,15 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
}
}
else
{
// timewindow based aggregation stream
if
(
stime
==
0
)
{
// no data in meter till now
stime
=
((
int64_t
)
taosGetTimestamp
(
pStream
->
precision
)
/
pStream
->
interval
)
*
pStream
->
interval
;
stime
-=
pStream
->
interval
;
tscWarn
(
"%p stream:%p, last timestamp:0, reset to:%"
PRId64
,
pSql
,
pStream
,
stime
);
stime
=
pQueryInfo
->
window
.
skey
;
if
(
stime
==
INT64_MIN
)
{
stime
=
(
int64_t
)
taosGetTimestamp
(
pStream
->
precision
);
stime
=
taosGetIntervalStartTimestamp
(
stime
,
pStream
->
intervalTime
,
pStream
->
intervalTime
,
pStream
->
intervalTimeUnit
,
pStream
->
precision
);
stime
=
taosGetIntervalStartTimestamp
(
stime
-
1
,
pStream
->
intervalTime
,
pStream
->
intervalTime
,
pStream
->
intervalTimeUnit
,
pStream
->
precision
);
tscWarn
(
"%p stream:%p, last timestamp:0, reset to:%"
PRId64
,
pSql
,
pStream
,
stime
);
}
}
else
{
int64_t
newStime
=
(
stime
/
pStream
->
interval
)
*
pStream
->
interval
;
int64_t
newStime
=
taosGetIntervalStartTimestamp
(
stime
,
pStream
->
intervalTime
,
pStream
->
intervalTime
,
pStream
->
intervalTimeUnit
,
pStream
->
precision
)
;
if
(
newStime
!=
stime
)
{
tscWarn
(
"%p stream:%p, last timestamp:%"
PRId64
", reset to:%"
PRId64
,
pSql
,
pStream
,
stime
,
newStime
);
stime
=
newStime
;
...
...
@@ -516,7 +534,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
taosTmrReset
(
tscProcessStreamTimer
,
(
int32_t
)
starttime
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
tscDebug
(
"%p stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
,
pStream
,
pTableMetaInfo
->
name
,
pStream
->
interval
,
pStream
->
slidingTime
,
starttime
,
pSql
->
sqlstr
);
pStream
,
pTableMetaInfo
->
name
,
pStream
->
interval
Time
,
pStream
->
slidingTime
,
starttime
,
pSql
->
sqlstr
);
}
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
...
...
src/client/src/tscSubquery.c
浏览文件 @
7b364249
...
...
@@ -178,6 +178,8 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
pSupporter
->
subqueryIndex
=
index
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
pSupporter
->
intervalTimeUnit
=
pQueryInfo
->
intervalTimeUnit
;
pSupporter
->
slidingTime
=
pQueryInfo
->
slidingTimeUnit
;
pSupporter
->
intervalTime
=
pQueryInfo
->
intervalTime
;
pSupporter
->
slidingTime
=
pQueryInfo
->
slidingTime
;
pSupporter
->
limit
=
pQueryInfo
->
limit
;
...
...
@@ -309,6 +311,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// set the second stage sub query for join process
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_JOIN_SEC_STAGE
);
pQueryInfo
->
intervalTimeUnit
=
pSupporter
->
intervalTimeUnit
;
pQueryInfo
->
slidingTimeUnit
=
pSupporter
->
slidingTimeUnit
;
pQueryInfo
->
intervalTime
=
pSupporter
->
intervalTime
;
pQueryInfo
->
slidingTime
=
pSupporter
->
slidingTime
;
pQueryInfo
->
groupbyExpr
=
pSupporter
->
groupbyExpr
;
...
...
src/client/src/tscUtil.c
浏览文件 @
7b364249
...
...
@@ -1830,6 +1830,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pNewQueryInfo
->
command
=
pQueryInfo
->
command
;
pNewQueryInfo
->
intervalTimeUnit
=
pQueryInfo
->
intervalTimeUnit
;
pNewQueryInfo
->
slidingTimeUnit
=
pQueryInfo
->
slidingTimeUnit
;
pNewQueryInfo
->
intervalTime
=
pQueryInfo
->
intervalTime
;
pNewQueryInfo
->
slidingTime
=
pQueryInfo
->
slidingTime
;
...
...
src/common/inc/tname.h
浏览文件 @
7b364249
...
...
@@ -35,6 +35,8 @@ bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo
*
tscFilterInfoClone
(
const
SColumnFilterInfo
*
src
,
int32_t
numOfFilters
);
int64_t
taosAddNatualInterval
(
int64_t
key
,
int64_t
intervalTime
,
char
timeUnit
,
int16_t
precision
);
int32_t
taosCountNatualInterval
(
int64_t
skey
,
int64_t
ekey
,
int64_t
intervalTime
,
char
timeUnit
,
int16_t
precision
);
int64_t
taosGetIntervalStartTimestamp
(
int64_t
startTime
,
int64_t
slidingTime
,
int64_t
intervalTime
,
char
timeUnit
,
int16_t
precision
);
#endif // TDENGINE_NAME_H
src/common/src/tname.c
浏览文件 @
7b364249
...
...
@@ -100,33 +100,120 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
return
pFilter
;
}
int64_t
taosAddNatualInterval
(
int64_t
key
,
int64_t
intervalTime
,
char
timeUnit
,
int16_t
precision
)
{
key
/=
1000
;
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
key
/=
1000
;
}
struct
tm
tm
;
time_t
t
=
(
time_t
)
key
;
localtime_r
(
&
t
,
&
tm
);
if
(
timeUnit
==
'y'
)
{
intervalTime
*=
12
;
}
int
mon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
+
intervalTime
;
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
key
=
mktime
(
&
tm
)
*
1000L
;
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
key
*=
1000L
;
}
return
key
;
}
int32_t
taosCountNatualInterval
(
int64_t
skey
,
int64_t
ekey
,
int64_t
intervalTime
,
char
timeUnit
,
int16_t
precision
)
{
skey
/=
1000
;
ekey
/=
1000
;
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
skey
/=
1000
;
ekey
/=
1000
;
}
if
(
ekey
<
skey
)
{
int64_t
tmp
=
ekey
;
ekey
=
skey
;
skey
=
tmp
;
}
struct
tm
tm
;
time_t
t
=
(
time_t
)
skey
;
localtime_r
(
&
t
,
&
tm
);
int
smon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
t
=
(
time_t
)
ekey
;
localtime_r
(
&
t
,
&
tm
);
int
emon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
if
(
timeUnit
==
'y'
)
{
intervalTime
*=
12
;
}
return
(
emon
-
smon
)
/
(
int32_t
)
intervalTime
;
}
int64_t
taosGetIntervalStartTimestamp
(
int64_t
startTime
,
int64_t
slidingTime
,
int64_t
intervalTime
,
char
timeUnit
,
int16_t
precision
)
{
if
(
slidingTime
==
0
)
{
return
startTime
;
}
int64_t
start
=
startTime
;
if
(
timeUnit
==
'n'
||
timeUnit
==
'y'
)
{
start
/=
1000
;
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
start
/=
1000
;
}
struct
tm
tm
;
time_t
t
=
(
time_t
)
start
;
localtime_r
(
&
t
,
&
tm
);
tm
.
tm_sec
=
0
;
tm
.
tm_min
=
0
;
tm
.
tm_hour
=
0
;
tm
.
tm_mday
=
1
;
if
(
timeUnit
==
'y'
)
{
tm
.
tm_mon
=
0
;
tm
.
tm_year
=
tm
.
tm_year
/
slidingTime
*
slidingTime
;
}
else
{
int
mon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
;
mon
=
mon
/
slidingTime
*
slidingTime
;
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
}
int64_t
start
=
((
startTime
-
intervalTime
)
/
slidingTime
+
1
)
*
slidingTime
;
if
(
!
(
timeUnit
==
'u'
||
timeUnit
==
'a'
||
timeUnit
==
'm'
||
timeUnit
==
's'
||
timeUnit
==
'h'
))
{
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t
timezone
=
_timezone
;
int32_t
daylight
=
_daylight
;
char
**
tzname
=
_tzname
;
#endif
start
=
mktime
(
&
tm
)
*
1000L
;
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
start
*=
1000L
;
}
}
else
{
start
=
((
start
-
intervalTime
)
/
slidingTime
+
1
)
*
slidingTime
;
if
(
timeUnit
==
'd'
||
timeUnit
==
'w'
)
{
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t
timezone
=
_timezone
;
int32_t
daylight
=
_daylight
;
char
**
tzname
=
_tzname
;
#endif
int64_t
t
=
(
precision
==
TSDB_TIME_PRECISION_MILLI
)
?
MILLISECOND_PER_SECOND
:
MILLISECOND_PER_SECOND
*
1000L
;
start
+=
timezone
*
t
;
}
int64_t
t
=
(
precision
==
TSDB_TIME_PRECISION_MILLI
)
?
MILLISECOND_PER_SECOND
:
MILLISECOND_PER_SECOND
*
1000L
;
start
+=
timezone
*
t
;
int64_t
end
=
start
+
intervalTime
-
1
;
if
(
end
<
startTime
)
{
start
+=
slidingTime
;
}
}
int64_t
end
=
start
+
intervalTime
-
1
;
if
(
end
<
startTime
)
{
start
+=
slidingTime
;
}
return
start
;
}
...
...
src/inc/taosmsg.h
浏览文件 @
7b364249
...
...
@@ -456,6 +456,7 @@ typedef struct {
int64_t
intervalTime
;
// time interval for aggregation, in million second
int64_t
intervalOffset
;
// start offset for interval query
int64_t
slidingTime
;
// value for sliding window
char
intervalTimeUnit
;
char
slidingTimeUnit
;
// time interval type, for revisement of interval(1d)
uint16_t
tagCondLen
;
// tag length in current query
int16_t
numOfGroupCols
;
// num of group by columns
...
...
src/os/inc/osTime.h
浏览文件 @
7b364249
...
...
@@ -64,6 +64,7 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
}
int32_t
getTimestampInUsFromStr
(
char
*
token
,
int32_t
tokenlen
,
int64_t
*
ts
);
int32_t
parseDuration
(
const
char
*
token
,
int32_t
tokenLen
,
int64_t
*
duration
,
char
*
unit
);
int32_t
taosParseTime
(
char
*
timestr
,
int64_t
*
time
,
int32_t
len
,
int32_t
timePrec
,
int8_t
dayligth
);
void
deltaToUtcInitOnce
();
...
...
src/os/src/detail/osTime.c
浏览文件 @
7b364249
...
...
@@ -319,6 +319,8 @@ int32_t parseLocaltimeWithDst(char* timestr, int64_t* time, int32_t timePrec) {
*
time
=
factor
*
seconds
+
fraction
;
return
0
;
}
static
int32_t
getTimestampInUsFromStrImpl
(
int64_t
val
,
char
unit
,
int64_t
*
result
)
{
*
result
=
val
;
...
...
@@ -384,6 +386,23 @@ int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts) {
return
getTimestampInUsFromStrImpl
(
timestamp
,
token
[
tokenlen
-
1
],
ts
);
}
int32_t
parseDuration
(
const
char
*
token
,
int32_t
tokenLen
,
int64_t
*
duration
,
char
*
unit
)
{
errno
=
0
;
/* get the basic numeric value */
*
duration
=
strtoll
(
token
,
NULL
,
10
);
if
(
errno
!=
0
)
{
return
-
1
;
}
*
unit
=
token
[
tokenLen
-
1
];
if
(
*
unit
==
'n'
||
*
unit
==
'y'
)
{
return
0
;
}
return
getTimestampInUsFromStrImpl
(
*
duration
,
*
unit
,
duration
);
}
// internal function, when program is paused in debugger,
// one can call this function from debugger to print a
// timestamp as human readable string, for example (gdb):
...
...
src/query/inc/qExecutor.h
浏览文件 @
7b364249
...
...
@@ -132,11 +132,12 @@ typedef struct SQueryCostInfo {
typedef
struct
SQuery
{
int16_t
numOfCols
;
int16_t
numOfTags
;
char
intervalTimeUnit
;
char
slidingTimeUnit
;
// interval data type, used for daytime revise
SOrderVal
order
;
STimeWindow
window
;
int64_t
intervalTime
;
int64_t
slidingTime
;
// sliding time for sliding window query
char
slidingTimeUnit
;
// interval data type, used for daytime revise
int16_t
precision
;
int16_t
numOfOutput
;
int16_t
fillType
;
...
...
src/query/src/qExecutor.c
浏览文件 @
7b364249
...
...
@@ -137,13 +137,44 @@ static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
// previous time window may not be of the same size of pQuery->intervalTime
#define GET_NEXT_TIMEWINDOW(_q, tw) \
do { \
int32_t factor = GET_FORWARD_DIRECTION_FACTOR((_q)->order.order); \
(tw)->skey += ((_q)->slidingTime * factor); \
(tw)->ekey = (tw)->skey + ((_q)->intervalTime - 1); \
} while (0)
static
void
getNextTimeWindow
(
SQuery
*
pQuery
,
STimeWindow
*
tw
)
{
int32_t
factor
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
if
(
pQuery
->
intervalTimeUnit
!=
'n'
&&
pQuery
->
intervalTimeUnit
!=
'y'
)
{
tw
->
skey
+=
pQuery
->
slidingTime
*
factor
;
tw
->
ekey
=
tw
->
skey
+
pQuery
->
intervalTime
-
1
;
return
;
}
int64_t
key
=
tw
->
skey
/
1000
,
interval
=
pQuery
->
intervalTime
;
if
(
pQuery
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
key
/=
1000
;
}
if
(
pQuery
->
intervalTimeUnit
==
'y'
)
{
interval
*=
12
;
}
struct
tm
tm
;
time_t
t
=
(
time_t
)
key
;
localtime_r
(
&
t
,
&
tm
);
int
mon
=
tm
.
tm_year
*
12
+
tm
.
tm_mon
+
interval
*
factor
;
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
tw
->
skey
=
mktime
(
&
tm
)
*
1000L
;
mon
+=
interval
;
tm
.
tm_year
=
mon
/
12
;
tm
.
tm_mon
=
mon
%
12
;
tw
->
ekey
=
mktime
(
&
tm
)
*
1000L
;
if
(
pQuery
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
tw
->
skey
*=
1000L
;
tw
->
ekey
*=
1000L
;
}
tw
->
ekey
-=
1
;
}
#define GET_NEXT_TIMEWINDOW(_q, tw) getNextTimeWindow((_q), (tw))
#define SET_STABLE_QUERY_OVER(_q) ((_q)->tableIndex = (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
...
...
@@ -467,9 +498,13 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
static
STimeWindow
getActiveTimeWindow
(
SWindowResInfo
*
pWindowResInfo
,
int64_t
ts
,
SQuery
*
pQuery
)
{
STimeWindow
w
=
{
0
};
if
(
pWindowResInfo
->
curIndex
==
-
1
)
{
// the first window, from the previous stored value
if
(
pWindowResInfo
->
curIndex
==
-
1
)
{
// the first window, from the previous stored value
w
.
skey
=
pWindowResInfo
->
prevSKey
;
w
.
ekey
=
w
.
skey
+
pQuery
->
intervalTime
-
1
;
if
(
pQuery
->
intervalTimeUnit
==
'n'
||
pQuery
->
intervalTimeUnit
==
'y'
)
{
w
.
ekey
=
taosAddNatualInterval
(
w
.
skey
,
pQuery
->
intervalTime
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
)
-
1
;
}
else
{
w
.
ekey
=
w
.
skey
+
pQuery
->
intervalTime
-
1
;
}
}
else
{
int32_t
slot
=
curTimeWindowIndex
(
pWindowResInfo
);
SWindowResult
*
pWindowRes
=
getWindowResult
(
pWindowResInfo
,
slot
);
...
...
@@ -477,19 +512,24 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
}
if
(
w
.
skey
>
ts
||
w
.
ekey
<
ts
)
{
int64_t
st
=
w
.
skey
;
if
(
pQuery
->
intervalTimeUnit
==
'n'
||
pQuery
->
intervalTimeUnit
==
'y'
)
{
w
.
skey
=
taosGetIntervalStartTimestamp
(
ts
,
pQuery
->
slidingTime
,
pQuery
->
intervalTime
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
);
w
.
ekey
=
taosAddNatualInterval
(
w
.
skey
,
pQuery
->
intervalTime
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
)
-
1
;
}
else
{
int64_t
st
=
w
.
skey
;
if
(
st
>
ts
)
{
st
-=
((
st
-
ts
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
}
if
(
st
>
ts
)
{
st
-=
((
st
-
ts
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
}
int64_t
et
=
st
+
pQuery
->
intervalTime
-
1
;
if
(
et
<
ts
)
{
st
+=
((
ts
-
et
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
}
int64_t
et
=
st
+
pQuery
->
intervalTime
-
1
;
if
(
et
<
ts
)
{
st
+=
((
ts
-
et
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
}
w
.
skey
=
st
;
w
.
ekey
=
w
.
skey
+
pQuery
->
intervalTime
-
1
;
w
.
skey
=
st
;
w
.
ekey
=
w
.
skey
+
pQuery
->
intervalTime
-
1
;
}
}
/*
...
...
@@ -814,14 +854,22 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
*/
if
(
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
primaryKeys
[
startPos
]
>
pNext
->
ekey
)
{
TSKEY
next
=
primaryKeys
[
startPos
];
pNext
->
ekey
+=
((
next
-
pNext
->
ekey
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
pNext
->
skey
=
pNext
->
ekey
-
pQuery
->
intervalTime
+
1
;
if
(
pQuery
->
intervalTimeUnit
==
'n'
||
pQuery
->
intervalTimeUnit
==
'y'
)
{
pNext
->
skey
=
taosGetIntervalStartTimestamp
(
next
,
pQuery
->
slidingTime
,
pQuery
->
intervalTime
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
);
pNext
->
ekey
=
taosAddNatualInterval
(
pNext
->
skey
,
pQuery
->
intervalTime
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
)
-
1
;
}
else
{
pNext
->
ekey
+=
((
next
-
pNext
->
ekey
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
pNext
->
skey
=
pNext
->
ekey
-
pQuery
->
intervalTime
+
1
;
}
}
else
if
((
!
QUERY_IS_ASC_QUERY
(
pQuery
))
&&
primaryKeys
[
startPos
]
<
pNext
->
skey
)
{
TSKEY
next
=
primaryKeys
[
startPos
];
pNext
->
skey
-=
((
pNext
->
skey
-
next
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
pNext
->
ekey
=
pNext
->
skey
+
pQuery
->
intervalTime
-
1
;
if
(
pQuery
->
intervalTimeUnit
==
'n'
||
pQuery
->
intervalTimeUnit
==
'y'
)
{
pNext
->
skey
=
taosGetIntervalStartTimestamp
(
next
,
pQuery
->
slidingTime
,
pQuery
->
intervalTime
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
);
pNext
->
ekey
=
taosAddNatualInterval
(
pNext
->
skey
,
pQuery
->
intervalTime
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
)
-
1
;
}
else
{
pNext
->
skey
-=
((
pNext
->
skey
-
next
+
pQuery
->
slidingTime
-
1
)
/
pQuery
->
slidingTime
)
*
pQuery
->
slidingTime
;
pNext
->
ekey
=
pNext
->
skey
+
pQuery
->
intervalTime
-
1
;
}
}
return
startPos
;
...
...
@@ -1804,7 +1852,8 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6
if
(
keyFirst
>
(
INT64_MAX
-
pQuery
->
intervalTime
))
{
assert
(
keyLast
-
keyFirst
<
pQuery
->
intervalTime
);
win
->
ekey
=
INT64_MAX
;
return
;
}
else
if
(
pQuery
->
intervalTimeUnit
==
'n'
||
pQuery
->
intervalTimeUnit
==
'y'
)
{
win
->
ekey
=
taosAddNatualInterval
(
win
->
skey
,
pQuery
->
intervalTime
,
pQuery
->
intervalTimeUnit
,
pQuery
->
precision
)
-
1
;
}
else
{
win
->
ekey
=
win
->
skey
+
pQuery
->
intervalTime
-
1
;
}
...
...
@@ -6016,6 +6065,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery
->
pGroupbyExpr
=
pGroupbyExpr
;
pQuery
->
intervalTime
=
pQueryMsg
->
intervalTime
;
pQuery
->
slidingTime
=
pQueryMsg
->
slidingTime
;
pQuery
->
intervalTimeUnit
=
pQueryMsg
->
intervalTimeUnit
;
pQuery
->
slidingTimeUnit
=
pQueryMsg
->
slidingTimeUnit
;
pQuery
->
fillType
=
pQueryMsg
->
fillType
;
pQuery
->
numOfTags
=
pQueryMsg
->
numOfTags
;
...
...
src/query/src/qFill.c
浏览文件 @
7b364249
...
...
@@ -179,14 +179,22 @@ int64_t getFilledNumOfRes(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows
if
(
numOfRows
>
0
)
{
// still fill gap within current data block, not generating data after the result set.
TSKEY
lastKey
=
tsList
[
pFillInfo
->
numOfRows
-
1
];
numOfRes
=
(
int64_t
)(
ABS
(
lastKey
-
pFillInfo
->
start
)
/
pFillInfo
->
slidingTime
)
+
1
;
if
(
pFillInfo
->
slidingUnit
!=
'y'
&&
pFillInfo
->
slidingUnit
!=
'n'
)
{
numOfRes
=
(
int64_t
)(
ABS
(
lastKey
-
pFillInfo
->
start
)
/
pFillInfo
->
slidingTime
)
+
1
;
}
else
{
numOfRes
=
taosCountNatualInterval
(
lastKey
,
pFillInfo
->
start
,
pFillInfo
->
slidingTime
,
pFillInfo
->
slidingUnit
,
pFillInfo
->
precision
)
+
1
;
}
assert
(
numOfRes
>=
numOfRows
);
}
else
{
// reach the end of data
if
((
ekey1
<
pFillInfo
->
start
&&
FILL_IS_ASC_FILL
(
pFillInfo
))
||
(
ekey1
>
pFillInfo
->
start
&&
!
FILL_IS_ASC_FILL
(
pFillInfo
)))
{
return
0
;
}
else
{
// the numOfRes rows are all filled with specified policy
}
// the numOfRes rows are all filled with specified policy
if
(
pFillInfo
->
slidingUnit
!=
'y'
&&
pFillInfo
->
slidingUnit
!=
'n'
)
{
numOfRes
=
(
ABS
(
ekey1
-
pFillInfo
->
start
)
/
pFillInfo
->
slidingTime
)
+
1
;
}
else
{
numOfRes
=
taosCountNatualInterval
(
ekey1
,
pFillInfo
->
start
,
pFillInfo
->
slidingTime
,
pFillInfo
->
slidingUnit
,
pFillInfo
->
precision
)
+
1
;
}
}
...
...
@@ -366,7 +374,12 @@ static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* nu
setTagsValue
(
pFillInfo
,
data
,
*
num
);
}
pFillInfo
->
start
+=
(
pFillInfo
->
slidingTime
*
step
);
// TODO natual sliding time
if
(
pFillInfo
->
slidingUnit
!=
'n'
&&
pFillInfo
->
slidingUnit
!=
'y'
)
{
pFillInfo
->
start
+=
(
pFillInfo
->
slidingTime
*
step
);
}
else
{
pFillInfo
->
start
=
taosAddNatualInterval
(
pFillInfo
->
start
,
pFillInfo
->
slidingTime
*
step
,
pFillInfo
->
slidingUnit
,
pFillInfo
->
precision
);
}
pFillInfo
->
numOfCurrent
++
;
(
*
num
)
+=
1
;
...
...
@@ -473,7 +486,12 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
// set the tag value for final result
setTagsValue
(
pFillInfo
,
data
,
num
);
pFillInfo
->
start
+=
(
pFillInfo
->
slidingTime
*
step
);
// TODO natual sliding time
if
(
pFillInfo
->
slidingUnit
!=
'n'
&&
pFillInfo
->
slidingUnit
!=
'y'
)
{
pFillInfo
->
start
+=
(
pFillInfo
->
slidingTime
*
step
);
}
else
{
pFillInfo
->
start
=
taosAddNatualInterval
(
pFillInfo
->
start
,
pFillInfo
->
slidingTime
*
step
,
pFillInfo
->
slidingUnit
,
pFillInfo
->
precision
);
}
pFillInfo
->
rowIdx
+=
1
;
pFillInfo
->
numOfCurrent
+=
1
;
...
...
tests/pytest/query/natualInterval.py
0 → 100644
浏览文件 @
7b364249
###################################################################
# Copyright (c) 2020 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
def
singleTable
(
self
):
tdSql
.
execute
(
"create table car(ts timestamp, s int)"
)
tdSql
.
execute
(
"insert into car values('2019-01-01 00:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2019-05-13 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2019-12-31 23:59:59', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-01-01 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-01-02 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-01-03 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-01-04 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-01-05 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-01-31 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-02-01 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-02-02 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-02-29 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-03-01 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-03-02 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-03-15 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-03-31 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car values('2020-05-01 12:00:00', 1)"
)
tdSql
.
query
(
"select count(*) from car interval(1n)"
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
2
,
1
,
1
)
tdSql
.
checkData
(
3
,
1
,
6
)
tdSql
.
checkData
(
4
,
1
,
3
)
tdSql
.
checkData
(
5
,
1
,
4
)
tdSql
.
checkData
(
6
,
1
,
1
)
tdSql
.
query
(
"select count(*) from car interval(1n) order by ts desc"
)
tdSql
.
checkData
(
6
,
1
,
1
)
tdSql
.
checkData
(
5
,
1
,
1
)
tdSql
.
checkData
(
4
,
1
,
1
)
tdSql
.
checkData
(
3
,
1
,
6
)
tdSql
.
checkData
(
2
,
1
,
3
)
tdSql
.
checkData
(
1
,
1
,
4
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
query
(
"select count(*) from car interval(2n)"
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
2
,
1
,
1
)
tdSql
.
checkData
(
3
,
1
,
9
)
tdSql
.
checkData
(
4
,
1
,
4
)
tdSql
.
checkData
(
5
,
1
,
1
)
tdSql
.
query
(
"select count(*) from car interval(2n) order by ts desc"
)
tdSql
.
checkData
(
5
,
1
,
1
)
tdSql
.
checkData
(
4
,
1
,
1
)
tdSql
.
checkData
(
3
,
1
,
1
)
tdSql
.
checkData
(
2
,
1
,
9
)
tdSql
.
checkData
(
1
,
1
,
4
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
query
(
"select count(*) from car interval(1y)"
)
tdSql
.
checkData
(
0
,
1
,
3
)
tdSql
.
checkData
(
1
,
1
,
14
)
tdSql
.
query
(
"select count(*) from car interval(2y)"
)
tdSql
.
checkData
(
0
,
1
,
3
)
tdSql
.
checkData
(
1
,
1
,
14
)
def
superTable
(
self
):
tdSql
.
execute
(
"create table cars(ts timestamp, s int) tags(id int)"
)
tdSql
.
execute
(
"create table car0 using cars tags(0)"
)
tdSql
.
execute
(
"create table car1 using cars tags(0)"
)
tdSql
.
execute
(
"create table car2 using cars tags(0)"
)
tdSql
.
execute
(
"create table car3 using cars tags(0)"
)
tdSql
.
execute
(
"create table car4 using cars tags(0)"
)
tdSql
.
execute
(
"insert into car0 values('2019-01-01 00:00:00', 1)"
)
tdSql
.
execute
(
"insert into car1 values('2019-05-13 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car2 values('2019-12-31 23:59:59', 1)"
)
tdSql
.
execute
(
"insert into car1 values('2020-01-01 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car1 values('2020-01-02 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car1 values('2020-01-03 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car1 values('2020-01-04 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car1 values('2020-01-05 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car1 values('2020-01-31 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car1 values('2020-02-01 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car2 values('2020-02-02 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car2 values('2020-02-29 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car3 values('2020-03-01 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car3 values('2020-03-02 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car3 values('2020-03-15 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car4 values('2020-03-31 12:00:00', 1)"
)
tdSql
.
execute
(
"insert into car3 values('2020-05-01 12:00:00', 1)"
)
tdSql
.
query
(
"select count(*) from cars interval(1n)"
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
2
,
1
,
1
)
tdSql
.
checkData
(
3
,
1
,
6
)
tdSql
.
checkData
(
4
,
1
,
3
)
tdSql
.
checkData
(
5
,
1
,
4
)
tdSql
.
checkData
(
6
,
1
,
1
)
tdSql
.
query
(
"select count(*) from cars interval(1n) order by ts desc"
)
tdSql
.
checkData
(
6
,
1
,
1
)
tdSql
.
checkData
(
5
,
1
,
1
)
tdSql
.
checkData
(
4
,
1
,
1
)
tdSql
.
checkData
(
3
,
1
,
6
)
tdSql
.
checkData
(
2
,
1
,
3
)
tdSql
.
checkData
(
1
,
1
,
4
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
query
(
"select count(*) from cars interval(2n)"
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
2
,
1
,
1
)
tdSql
.
checkData
(
3
,
1
,
9
)
tdSql
.
checkData
(
4
,
1
,
4
)
tdSql
.
checkData
(
5
,
1
,
1
)
tdSql
.
query
(
"select count(*) from cars interval(2n) order by ts desc"
)
tdSql
.
checkData
(
5
,
1
,
1
)
tdSql
.
checkData
(
4
,
1
,
1
)
tdSql
.
checkData
(
3
,
1
,
1
)
tdSql
.
checkData
(
2
,
1
,
9
)
tdSql
.
checkData
(
1
,
1
,
4
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
query
(
"select count(*) from cars interval(1y)"
)
tdSql
.
checkData
(
0
,
1
,
3
)
tdSql
.
checkData
(
1
,
1
,
14
)
tdSql
.
query
(
"select count(*) from cars interval(2y)"
)
tdSql
.
checkData
(
0
,
1
,
3
)
tdSql
.
checkData
(
1
,
1
,
14
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
singleTable
()
self
.
superTable
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录