Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
91ced4c1
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
91ced4c1
编写于
7月 07, 2020
作者:
B
Bomin Zhang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
set end time of stream queries properly
上级
2fc2feb8
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
54 addition
and
49 deletion
+54
-49
src/client/src/tscStream.c
src/client/src/tscStream.c
+54
-49
未找到文件。
src/client/src/tscStream.c
浏览文件 @
91ced4c1
...
...
@@ -121,7 +121,20 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pQueryInfo
->
window
.
ekey
=
pStream
->
etime
;
}
}
else
{
pQueryInfo
->
window
.
skey
=
pStream
->
stime
-
pStream
->
interval
;
pQueryInfo
->
window
.
skey
=
pStream
->
stime
;
// - pStream->interval;
int64_t
etime
=
taosGetTimestamp
(
pStream
->
precision
);
// delay to wait all data in last time window
if
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
etime
-=
tsMaxStreamComputDelay
*
1000l
;
}
else
{
etime
-=
tsMaxStreamComputDelay
;
}
if
(
etime
>
pStream
->
etime
)
{
etime
=
pStream
->
etime
;
}
else
{
etime
=
pStream
->
stime
+
(
etime
-
pStream
->
stime
)
/
pStream
->
interval
*
pStream
->
interval
;
}
pQueryInfo
->
window
.
ekey
=
etime
;
}
// launch stream computing in a new thread
...
...
@@ -151,17 +164,45 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
taos_fetch_rows_a
(
tres
,
tscProcessStreamRetrieveResult
,
param
);
}
static
void
tscSetTimestampForRes
(
SSqlStream
*
pStream
,
SSqlObj
*
pSql
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
// no need to be called as this is alreay done in the query
static
void
tscStreamFillTimeGap
(
SSqlStream
*
pStream
,
TSKEY
ts
)
{
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
}
int64_t
timestamp
=
*
(
int64_t
*
)
pRes
->
data
;
int64_t
actualTimestamp
=
pStream
->
stime
-
pStream
->
interval
;
SSqlRes *pRes = &pSql->res;
/* failed to retrieve any result in this retrieve */
pSql->res.numOfRows = 1;
void *row[TSDB_MAX_COLUMNS] = {0};
char tmpRes[TSDB_MAX_BYTES_PER_ROW] = {0};
void *oldPtr = pSql->res.data;
pSql->res.data = tmpRes;
int32_t rowNum = 0;
while (pStream->stime + pStream->slidingTime < ts) {
pStream->stime += pStream->slidingTime;
*(TSKEY*)row[0] = pStream->stime;
for (int32_t i = 1; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->fillVal[i]), pField->bytes, pField->type);
row[i] = pSql->res.data + offset;
}
(*pStream->fp)(pStream->param, pSql, row);
++rowNum;
}
if
(
timestamp
!=
actualTimestamp
)
{
// reset the timestamp of each agg point by using start time of each interval
*
((
int64_t
*
)
pRes
->
data
)
=
actualTimestamp
;
tscWarn
(
"%p stream:%p, timestamp of points is:%"
PRId64
", reset to %"
PRId64
,
pSql
,
pStream
,
timestamp
,
actualTimestamp
);
if (rowNum > 0) {
tscDebug("%p stream:%p %d rows padded", pSql, pStream, rowNum);
}
pRes->numOfRows = 0;
pRes->data = oldPtr;
#endif
}
static
void
tscProcessStreamRetrieveResult
(
void
*
param
,
TAOS_RES
*
res
,
int
numOfRows
)
{
...
...
@@ -180,10 +221,10 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if
(
numOfRows
>
0
)
{
// when reaching here the first execution of stream computing is successful.
pStream
->
numOfRes
+=
numOfRows
;
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
TAOS_ROW
row
=
taos_fetch_row
(
res
);
tscDebug
(
"%p stream:%p fetch result"
,
pSql
,
pStream
);
tscStreamFillTimeGap
(
pStream
,
*
(
TSKEY
*
)
row
[
0
]);
pStream
->
stime
=
*
(
TSKEY
*
)
row
[
0
];
// user callback function
...
...
@@ -194,40 +235,8 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
taos_fetch_rows_a
(
res
,
tscProcessStreamRetrieveResult
,
pStream
);
}
else
{
// numOfRows == 0, all data has been retrieved
pStream
->
useconds
+=
pSql
->
res
.
useconds
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pStream
->
numOfRes
==
0
)
{
if
(
pQueryInfo
->
fillType
==
TSDB_FILL_SET_VALUE
||
pQueryInfo
->
fillType
==
TSDB_FILL_NULL
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
/* failed to retrieve any result in this retrieve */
pSql
->
res
.
numOfRows
=
1
;
void
*
row
[
TSDB_MAX_COLUMNS
]
=
{
0
};
char
tmpRes
[
TSDB_MAX_BYTES_PER_ROW
]
=
{
0
};
void
*
oldPtr
=
pSql
->
res
.
data
;
pSql
->
res
.
data
=
tmpRes
;
for
(
int32_t
i
=
1
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
int16_t
offset
=
tscFieldInfoGetOffset
(
pQueryInfo
,
i
);
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
assignVal
(
pSql
->
res
.
data
+
offset
,
(
char
*
)(
&
pQueryInfo
->
fillVal
[
i
]),
pField
->
bytes
,
pField
->
type
);
row
[
i
]
=
pSql
->
res
.
data
+
offset
;
}
tscSetTimestampForRes
(
pStream
,
pSql
);
row
[
0
]
=
pRes
->
data
;
tscDebug
(
"%p stream:%p fetch result"
,
pSql
,
pStream
);
// user callback function
(
*
pStream
->
fp
)(
pStream
->
param
,
res
,
row
);
pRes
->
numOfRows
=
0
;
pRes
->
data
=
oldPtr
;
}
else
if
(
pStream
->
isProject
)
{
if
(
pStream
->
isProject
)
{
/* no resuls in the query range, retry */
// todo set retry dynamic time
int32_t
retry
=
tsProjectExecInterval
;
...
...
@@ -236,10 +245,8 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscSetRetryTimer
(
pStream
,
pStream
->
pSql
,
retry
);
return
;
}
}
else
{
if
(
pStream
->
isProject
)
{
pStream
->
stime
+=
1
;
}
}
else
if
(
pStream
->
isProject
)
{
pStream
->
stime
+=
1
;
}
tscDebug
(
"%p stream:%p, query on:%s, fetch result completed, fetched rows:%"
PRId64
,
pSql
,
pStream
,
pTableMetaInfo
->
name
,
...
...
@@ -421,8 +428,6 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
}
else
{
// timewindow based aggregation stream
if
(
stime
==
0
)
{
// no data in meter till now
stime
=
((
int64_t
)
taosGetTimestamp
(
pStream
->
precision
)
/
pStream
->
interval
)
*
pStream
->
interval
;
const
char
*
fmtts
(
int64_t
);
printf
(
"stream start time is: %s
\n
"
,
fmtts
(
stime
));
tscWarn
(
"%p stream:%p, last timestamp:0, reset to:%"
PRId64
,
pSql
,
pStream
,
stime
);
}
else
{
int64_t
newStime
=
(
stime
/
pStream
->
interval
)
*
pStream
->
interval
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录