Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a9360afa
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a9360afa
编写于
1月 15, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-2635]<feature>: support next fill option.
上级
b15eb994
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
295 addition
and
273 deletion
+295
-273
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+5
-6
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+2
-0
src/common/src/ttypes.c
src/common/src/ttypes.c
+6
-2
src/inc/taosmsg.h
src/inc/taosmsg.h
+19
-18
src/query/inc/qFill.h
src/query/inc/qFill.h
+2
-2
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+6
-8
src/query/src/qFill.c
src/query/src/qFill.c
+255
-237
未找到文件。
src/client/src/tscLocalMerge.c
浏览文件 @
a9360afa
...
...
@@ -979,14 +979,13 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
pQueryInfo
->
limit
.
offset
-=
newRows
;
pRes
->
numOfRows
=
0
;
int32_t
rpoints
=
taosNumOfRemainRows
(
pFillInfo
);
if
(
rpoints
<=
0
)
{
if
(
!
taosFillHasMoreResults
(
pFillInfo
))
{
if
(
!
doneOutput
)
{
// reduce procedure has not completed yet, but current results for fill are exhausted
break
;
}
// all output in current group are completed
int32_t
totalRemainRows
=
(
int32_t
)
getNumOfRes
WithFill
(
pFillInfo
,
actualETime
,
pLocalReducer
->
resColModel
->
capacity
);
int32_t
totalRemainRows
=
(
int32_t
)
getNumOfRes
ultsAfterFillGap
(
pFillInfo
,
actualETime
,
pLocalReducer
->
resColModel
->
capacity
);
if
(
totalRemainRows
<=
0
)
{
break
;
}
...
...
@@ -1337,14 +1336,14 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SFillInfo
*
pFillInfo
=
pLocalReducer
->
pFillInfo
;
if
(
pFillInfo
!=
NULL
&&
taos
NumOfRemainRows
(
pFillInfo
)
>
0
)
{
if
(
pFillInfo
!=
NULL
&&
taos
FillHasMoreResults
(
pFillInfo
)
)
{
assert
(
pQueryInfo
->
fillType
!=
TSDB_FILL_NONE
);
tFilePage
*
pFinalDataBuf
=
pLocalReducer
->
pResultBuf
;
int64_t
etime
=
*
(
int64_t
*
)(
pFinalDataBuf
->
data
+
TSDB_KEYSIZE
*
(
pFillInfo
->
numOfRows
-
1
));
// the first column must be the timestamp column
int32_t
rows
=
(
int32_t
)
getNumOfRes
WithFill
(
pFillInfo
,
etime
,
pLocalReducer
->
resColModel
->
capacity
);
int32_t
rows
=
(
int32_t
)
getNumOfRes
ultsAfterFillGap
(
pFillInfo
,
etime
,
pLocalReducer
->
resColModel
->
capacity
);
if
(
rows
>
0
)
{
// do fill gap
doFillResult
(
pSql
,
pLocalReducer
,
false
);
}
...
...
@@ -1373,7 +1372,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
((
pRes
->
numOfRowsGroup
<
pQueryInfo
->
limit
.
limit
&&
pQueryInfo
->
limit
.
limit
>
0
)
||
(
pQueryInfo
->
limit
.
limit
<
0
)))
{
int64_t
etime
=
(
pQueryInfo
->
order
.
order
==
TSDB_ORDER_ASC
)
?
pQueryInfo
->
window
.
ekey
:
pQueryInfo
->
window
.
skey
;
int32_t
rows
=
(
int32_t
)
getNumOfRes
WithFill
(
pFillInfo
,
etime
,
pLocalReducer
->
resColModel
->
capacity
);
int32_t
rows
=
(
int32_t
)
getNumOfRes
ultsAfterFillGap
(
pFillInfo
,
etime
,
pLocalReducer
->
resColModel
->
capacity
);
if
(
rows
>
0
)
{
doFillResult
(
pSql
,
pLocalReducer
,
true
);
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
a9360afa
...
...
@@ -4514,6 +4514,8 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
}
}
else
if
(
strncasecmp
(
pItem
->
pVar
.
pz
,
"prev"
,
4
)
==
0
&&
pItem
->
pVar
.
nLen
==
4
)
{
pQueryInfo
->
fillType
=
TSDB_FILL_PREV
;
}
else
if
(
strncasecmp
(
pItem
->
pVar
.
pz
,
"next"
,
4
)
==
0
&&
pItem
->
pVar
.
nLen
==
4
)
{
pQueryInfo
->
fillType
=
TSDB_FILL_NEXT
;
}
else
if
(
strncasecmp
(
pItem
->
pVar
.
pz
,
"linear"
,
6
)
==
0
&&
pItem
->
pVar
.
nLen
==
6
)
{
pQueryInfo
->
fillType
=
TSDB_FILL_LINEAR
;
}
else
if
(
strncasecmp
(
pItem
->
pVar
.
pz
,
"value"
,
5
)
==
0
&&
pItem
->
pVar
.
nLen
==
5
)
{
...
...
src/common/src/ttypes.c
浏览文件 @
a9360afa
...
...
@@ -524,15 +524,18 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
switch
(
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_UTINYINT
:
*
((
int8_t
*
)
val
)
=
GET_INT8_VAL
(
src
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_USMALLINT
:
*
((
int16_t
*
)
val
)
=
GET_INT16_VAL
(
src
);
break
;
case
TSDB_DATA_TYPE_INT
:
{
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
*
((
int32_t
*
)
val
)
=
GET_INT32_VAL
(
src
);
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
SET_FLOAT_VAL
(
val
,
GET_FLOAT_VAL
(
src
));
break
;
...
...
@@ -540,6 +543,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
SET_DOUBLE_VAL
(
val
,
GET_DOUBLE_VAL
(
src
));
break
;
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
case
TSDB_DATA_TYPE_TIMESTAMP
:
*
((
int64_t
*
)
val
)
=
GET_INT64_VAL
(
src
);
break
;
...
...
src/inc/taosmsg.h
浏览文件 @
a9360afa
...
...
@@ -153,30 +153,31 @@ enum _mgmt_table {
#define TSDB_ALTER_TABLE_DROP_COLUMN 6
#define TSDB_ALTER_TABLE_CHANGE_COLUMN 7
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
#define TSDB_FILL_SET_VALUE 2
#define TSDB_FILL_LINEAR 3
#define TSDB_FILL_PREV 4
#define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
#define TSDB_FILL_SET_VALUE 2
#define TSDB_FILL_LINEAR 3
#define TSDB_FILL_PREV 4
#define TSDB_FILL_NEXT 5
#define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_PRIVILEGES 0x2
#define TSDB_KILL_MSG_LEN 30
#define TSDB_KILL_MSG_LEN
30
#define TSDB_VN_READ_ACCCESS ((char)0x1)
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
#define TSDB_VN_READ_ACCCESS
((char)0x1)
#define TSDB_VN_WRITE_ACCCESS
((char)0x2)
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
#define TSDB_COL_NORMAL 0x0u // the normal column of the table
#define TSDB_COL_TAG 0x1u // the tag column type
#define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column
#define TSDB_COL_NULL 0x4u // the column filter NULL or not
#define TSDB_COL_NORMAL
0x0u // the normal column of the table
#define TSDB_COL_TAG
0x1u // the tag column type
#define TSDB_COL_UDC
0x2u // the user specified normal string column, it is a dummy column
#define TSDB_COL_NULL
0x4u // the column filter NULL or not
#define TSDB_COL_IS_TAG(f) (((f&(~(TSDB_COL_NULL)))&TSDB_COL_TAG) != 0)
#define TSDB_COL_IS_NORMAL_COL(f)
((f&(~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
#define TSDB_COL_IS_UD_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
#define TSDB_COL_IS_TAG(f)
(((f&(~(TSDB_COL_NULL)))&TSDB_COL_TAG) != 0)
#define TSDB_COL_IS_NORMAL_COL(f) ((f&(~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
#define TSDB_COL_IS_UD_COL(f)
((f&(~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f)
(((f)&TSDB_COL_NULL) != 0)
extern
char
*
taosMsg
[];
...
...
src/query/inc/qFill.h
浏览文件 @
a9360afa
...
...
@@ -82,9 +82,9 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, const tFilePage** p
void
taosFillCopyInputDataFromOneFilePage
(
SFillInfo
*
pFillInfo
,
const
tFilePage
*
pInput
);
int64_t
getNumOfResWithFill
(
SFillInfo
*
pFillInfo
,
int64_t
ekey
,
int32_t
maxNumOfRows
);
bool
taosFillHasMoreResults
(
SFillInfo
*
pFillInfo
);
int
32_t
taosNumOfRemainRows
(
SFillInfo
*
pFillInfo
);
int
64_t
getNumOfResultsAfterFillGap
(
SFillInfo
*
pFillInfo
,
int64_t
ekey
,
int32_t
maxNumOfRows
);
int32_t
taosGetLinearInterpolationVal
(
int32_t
type
,
SPoint
*
point1
,
SPoint
*
point2
,
SPoint
*
point
);
...
...
src/query/src/qExecutor.c
浏览文件 @
a9360afa
...
...
@@ -4176,7 +4176,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
}
}
bool
queryHasRemainResForTableQuery
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
bool
hasNotReturnedResults
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SFillInfo
*
pFillInfo
=
pRuntimeEnv
->
pFillInfo
;
...
...
@@ -4186,8 +4186,7 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {
if
(
pQuery
->
fillType
!=
TSDB_FILL_NONE
&&
!
isPointInterpoQuery
(
pQuery
))
{
// There are results not returned to client yet, so filling applied to the remain result is required firstly.
int32_t
remain
=
taosNumOfRemainRows
(
pFillInfo
);
if
(
remain
>
0
)
{
if
(
taosFillHasMoreResults
(
pFillInfo
))
{
return
true
;
}
...
...
@@ -4201,7 +4200,7 @@ bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {
* first result row in the actual result set will fill nothing.
*/
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
int32_t
numOfTotal
=
(
int32_t
)
getNumOfRes
WithFill
(
pFillInfo
,
pQuery
->
window
.
ekey
,
(
int32_t
)
pQuery
->
rec
.
capacity
);
int32_t
numOfTotal
=
(
int32_t
)
getNumOfRes
ultsAfterFillGap
(
pFillInfo
,
pQuery
->
window
.
ekey
,
(
int32_t
)
pQuery
->
rec
.
capacity
);
return
numOfTotal
>
0
;
}
...
...
@@ -4269,7 +4268,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
setQueryStatus
(
pQuery
,
QUERY_OVER
);
}
}
else
{
if
(
!
queryHasRemainResForTableQuery
(
&
pQInfo
->
runtimeEnv
))
{
if
(
!
hasNotReturnedResults
(
&
pQInfo
->
runtimeEnv
))
{
setQueryStatus
(
pQuery
,
QUERY_OVER
);
}
}
...
...
@@ -4296,7 +4295,6 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
pQInfo
,
pFillInfo
->
numOfRows
,
ret
,
pQuery
->
limit
.
offset
,
ret
-
pQuery
->
limit
.
offset
,
0
);
ret
-=
(
int32_t
)
pQuery
->
limit
.
offset
;
// todo !!!!there exactly number of interpo is not valid.
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
memmove
(
pDst
[
i
]
->
data
,
pDst
[
i
]
->
data
+
pQuery
->
pExpr1
[
i
].
bytes
*
pQuery
->
limit
.
offset
,
ret
*
pQuery
->
pExpr1
[
i
].
bytes
);
...
...
@@ -4314,7 +4312,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
ret
=
0
;
}
if
(
!
queryHasRemainResForTableQuery
(
pRuntimeEnv
))
{
if
(
!
hasNotReturnedResults
(
pRuntimeEnv
))
{
return
ret
;
}
}
...
...
@@ -5774,7 +5772,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
queryHasRemainResForTableQuery
(
pRuntimeEnv
))
{
if
(
hasNotReturnedResults
(
pRuntimeEnv
))
{
if
(
pQuery
->
fillType
!=
TSDB_FILL_NONE
)
{
/*
* There are remain results that are not returned due to result interpolation
...
...
src/query/src/qFill.c
浏览文件 @
a9360afa
...
...
@@ -25,237 +25,8 @@
#include "queryLog.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
// there are no duplicated tags in the SFillTagColInfo list
static
int32_t
setTagColumnInfo
(
SFillInfo
*
pFillInfo
,
int32_t
numOfCols
,
int32_t
capacity
)
{
int32_t
rowsize
=
0
;
int32_t
k
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SFillColInfo
*
pColInfo
=
&
pFillInfo
->
pFillCol
[
i
];
pFillInfo
->
pData
[
i
]
=
calloc
(
1
,
pColInfo
->
col
.
bytes
*
capacity
);
if
(
TSDB_COL_IS_TAG
(
pColInfo
->
flag
))
{
bool
exists
=
false
;
int32_t
index
=
-
1
;
for
(
int32_t
j
=
0
;
j
<
k
;
++
j
)
{
if
(
pFillInfo
->
pTags
[
j
].
col
.
colId
==
pColInfo
->
col
.
colId
)
{
exists
=
true
;
index
=
j
;
break
;
}
}
if
(
!
exists
)
{
SSchema
*
pSchema
=
&
pFillInfo
->
pTags
[
k
].
col
;
pSchema
->
colId
=
pColInfo
->
col
.
colId
;
pSchema
->
type
=
pColInfo
->
col
.
type
;
pSchema
->
bytes
=
pColInfo
->
col
.
bytes
;
pFillInfo
->
pTags
[
k
].
tagVal
=
calloc
(
1
,
pColInfo
->
col
.
bytes
);
pColInfo
->
tagIndex
=
k
;
k
+=
1
;
}
else
{
pColInfo
->
tagIndex
=
index
;
}
}
rowsize
+=
pColInfo
->
col
.
bytes
;
}
assert
(
k
<=
pFillInfo
->
numOfTags
);
return
rowsize
;
}
SFillInfo
*
taosInitFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
int64_t
slidingTime
,
int8_t
slidingUnit
,
int8_t
precision
,
int32_t
fillType
,
SFillColInfo
*
pCol
,
void
*
handle
)
{
if
(
fillType
==
TSDB_FILL_NONE
)
{
return
NULL
;
}
SFillInfo
*
pFillInfo
=
calloc
(
1
,
sizeof
(
SFillInfo
));
taosResetFillInfo
(
pFillInfo
,
skey
);
pFillInfo
->
order
=
order
;
pFillInfo
->
type
=
fillType
;
pFillInfo
->
pFillCol
=
pCol
;
pFillInfo
->
numOfTags
=
numOfTags
;
pFillInfo
->
numOfCols
=
numOfCols
;
pFillInfo
->
precision
=
precision
;
pFillInfo
->
alloc
=
capacity
;
pFillInfo
->
handle
=
handle
;
pFillInfo
->
interval
.
interval
=
slidingTime
;
pFillInfo
->
interval
.
intervalUnit
=
slidingUnit
;
pFillInfo
->
interval
.
sliding
=
slidingTime
;
pFillInfo
->
interval
.
slidingUnit
=
slidingUnit
;
pFillInfo
->
pData
=
malloc
(
POINTER_BYTES
*
numOfCols
);
if
(
numOfTags
>
0
)
{
pFillInfo
->
pTags
=
calloc
(
pFillInfo
->
numOfTags
,
sizeof
(
SFillTagColInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
pFillInfo
->
pTags
[
i
].
col
.
colId
=
-
2
;
// TODO
}
}
pFillInfo
->
rowSize
=
setTagColumnInfo
(
pFillInfo
,
pFillInfo
->
numOfCols
,
pFillInfo
->
alloc
);
assert
(
pFillInfo
->
rowSize
>
0
);
return
pFillInfo
;
}
void
taosResetFillInfo
(
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
)
{
pFillInfo
->
start
=
startTimestamp
;
pFillInfo
->
currentKey
=
startTimestamp
;
pFillInfo
->
index
=
-
1
;
pFillInfo
->
numOfRows
=
0
;
pFillInfo
->
numOfCurrent
=
0
;
pFillInfo
->
numOfTotal
=
0
;
}
void
*
taosDestroyFillInfo
(
SFillInfo
*
pFillInfo
)
{
if
(
pFillInfo
==
NULL
)
{
return
NULL
;
}
tfree
(
pFillInfo
->
prevValues
);
tfree
(
pFillInfo
->
nextValues
);
tfree
(
pFillInfo
->
pTags
);
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
tfree
(
pFillInfo
->
pData
[
i
]);
}
tfree
(
pFillInfo
->
pData
);
tfree
(
pFillInfo
->
pFillCol
);
tfree
(
pFillInfo
);
return
NULL
;
}
void
taosFillSetStartInfo
(
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
)
{
if
(
pFillInfo
->
type
==
TSDB_FILL_NONE
)
{
return
;
}
pFillInfo
->
end
=
endKey
;
if
(
!
FILL_IS_ASC_FILL
(
pFillInfo
))
{
pFillInfo
->
end
=
taosTimeTruncate
(
endKey
,
&
pFillInfo
->
interval
,
pFillInfo
->
precision
);
}
pFillInfo
->
index
=
0
;
pFillInfo
->
numOfRows
=
numOfRows
;
// ensure the space
if
(
pFillInfo
->
alloc
<
numOfRows
)
{
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
char
*
tmp
=
realloc
(
pFillInfo
->
pData
[
i
],
numOfRows
*
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
);
assert
(
tmp
!=
NULL
);
// todo handle error
memset
(
tmp
,
0
,
numOfRows
*
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
);
pFillInfo
->
pData
[
i
]
=
tmp
;
}
}
}
// copy the data into source data buffer
void
taosFillCopyInputDataFromFilePage
(
SFillInfo
*
pFillInfo
,
const
tFilePage
**
pInput
)
{
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
memcpy
(
pFillInfo
->
pData
[
i
],
pInput
[
i
]
->
data
,
pFillInfo
->
numOfRows
*
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
);
}
}
void
taosFillCopyInputDataFromOneFilePage
(
SFillInfo
*
pFillInfo
,
const
tFilePage
*
pInput
)
{
assert
(
pFillInfo
->
numOfRows
==
pInput
->
num
);
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
SFillColInfo
*
pCol
=
&
pFillInfo
->
pFillCol
[
i
];
const
char
*
data
=
pInput
->
data
+
pCol
->
col
.
offset
*
pInput
->
num
;
memcpy
(
pFillInfo
->
pData
[
i
],
data
,
(
size_t
)(
pInput
->
num
*
pCol
->
col
.
bytes
));
if
(
TSDB_COL_IS_TAG
(
pCol
->
flag
))
{
// copy the tag value to tag value buffer
SFillTagColInfo
*
pTag
=
&
pFillInfo
->
pTags
[
pCol
->
tagIndex
];
assert
(
pTag
->
col
.
colId
==
pCol
->
col
.
colId
);
memcpy
(
pTag
->
tagVal
,
data
,
pCol
->
col
.
bytes
);
}
}
}
int64_t
getNumOfResWithFill
(
SFillInfo
*
pFillInfo
,
TSKEY
ekey
,
int32_t
maxNumOfRows
)
{
int64_t
*
tsList
=
(
int64_t
*
)
pFillInfo
->
pData
[
0
];
int32_t
numOfRows
=
taosNumOfRemainRows
(
pFillInfo
);
TSKEY
ekey1
=
ekey
;
if
(
!
FILL_IS_ASC_FILL
(
pFillInfo
))
{
pFillInfo
->
end
=
taosTimeTruncate
(
ekey
,
&
pFillInfo
->
interval
,
pFillInfo
->
precision
);
}
int64_t
numOfRes
=
-
1
;
if
(
numOfRows
>
0
)
{
// still fill gap within current data block, not generating data after the result set.
TSKEY
lastKey
=
tsList
[
pFillInfo
->
numOfRows
-
1
];
numOfRes
=
taosTimeCountInterval
(
lastKey
,
pFillInfo
->
currentKey
,
pFillInfo
->
interval
.
sliding
,
pFillInfo
->
interval
.
slidingUnit
,
pFillInfo
->
precision
);
numOfRes
+=
1
;
assert
(
numOfRes
>=
numOfRows
);
}
else
{
// reach the end of data
if
((
ekey1
<
pFillInfo
->
currentKey
&&
FILL_IS_ASC_FILL
(
pFillInfo
))
||
(
ekey1
>
pFillInfo
->
currentKey
&&
!
FILL_IS_ASC_FILL
(
pFillInfo
)))
{
return
0
;
}
numOfRes
=
taosTimeCountInterval
(
ekey1
,
pFillInfo
->
currentKey
,
pFillInfo
->
interval
.
sliding
,
pFillInfo
->
interval
.
slidingUnit
,
pFillInfo
->
precision
);
numOfRes
+=
1
;
}
return
(
numOfRes
>
maxNumOfRows
)
?
maxNumOfRows
:
numOfRes
;
}
int32_t
taosNumOfRemainRows
(
SFillInfo
*
pFillInfo
)
{
if
(
pFillInfo
->
numOfRows
==
0
||
(
pFillInfo
->
numOfRows
>
0
&&
pFillInfo
->
index
>=
pFillInfo
->
numOfRows
))
{
return
0
;
}
return
pFillInfo
->
numOfRows
-
pFillInfo
->
index
;
}
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
int32_t
taosGetLinearInterpolationVal
(
int32_t
type
,
SPoint
*
point1
,
SPoint
*
point2
,
SPoint
*
point
)
{
double
v1
=
-
1
;
double
v2
=
-
1
;
GET_TYPED_DATA
(
v1
,
double
,
type
,
point1
->
val
);
GET_TYPED_DATA
(
v2
,
double
,
type
,
point2
->
val
);
double
r
=
DO_INTERPOLATION
(
v1
,
v2
,
point1
->
key
,
point2
->
key
,
point
->
key
);
switch
(
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
*
(
int8_t
*
)
point
->
val
=
(
int8_t
)
r
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
*
(
int16_t
*
)
point
->
val
=
(
int16_t
)
r
;
break
;
case
TSDB_DATA_TYPE_INT
:
*
(
int32_t
*
)
point
->
val
=
(
int32_t
)
r
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
*
(
int64_t
*
)
point
->
val
=
(
int64_t
)
r
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
*
(
double
*
)
point
->
val
=
(
double
)
r
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
*
(
float
*
)
point
->
val
=
(
float
)
r
;
break
;
default:
assert
(
0
);
}
return
TSDB_CODE_SUCCESS
;
}
static
void
setTagsValue
(
SFillInfo
*
pFillInfo
,
tFilePage
**
data
,
int32_t
genRows
)
{
for
(
int32_t
j
=
0
;
j
<
pFillInfo
->
numOfCols
;
++
j
)
{
SFillColInfo
*
pCol
=
&
pFillInfo
->
pFillCol
[
j
];
...
...
@@ -298,7 +69,23 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
// set the other values
if
(
pFillInfo
->
type
==
TSDB_FILL_PREV
)
{
char
*
p
=
FILL_IS_ASC_FILL
(
pFillInfo
)
?
prev
:
next
;
if
(
p
!=
NULL
)
{
for
(
int32_t
i
=
1
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
SFillColInfo
*
pCol
=
&
pFillInfo
->
pFillCol
[
i
];
if
(
TSDB_COL_IS_TAG
(
pCol
->
flag
))
{
continue
;
}
char
*
output
=
elePtrAt
(
data
[
i
]
->
data
,
pCol
->
col
.
bytes
,
index
);
assignVal
(
output
,
p
+
pCol
->
col
.
offset
,
pCol
->
col
.
bytes
,
pCol
->
col
.
type
);
}
}
else
{
// no prev value yet, set the value for NULL
setNullValueForRow
(
pFillInfo
,
data
,
pFillInfo
->
numOfCols
,
index
);
}
}
else
if
(
pFillInfo
->
type
==
TSDB_FILL_NEXT
)
{
char
*
p
=
FILL_IS_ASC_FILL
(
pFillInfo
)
?
next
:
prev
;
if
(
p
!=
NULL
)
{
for
(
int32_t
i
=
1
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
SFillColInfo
*
pCol
=
&
pFillInfo
->
pFillCol
[
i
];
...
...
@@ -323,7 +110,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
int16_t
type
=
pCol
->
col
.
type
;
int16_t
bytes
=
pCol
->
col
.
bytes
;
char
*
val1
=
elePtrAt
(
data
[
i
]
->
data
,
pCol
->
col
.
bytes
,
index
);
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
||
type
==
TSDB_DATA_TYPE_BOOL
)
{
setNull
(
val1
,
pCol
->
col
.
type
,
bytes
);
...
...
@@ -359,7 +146,7 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) {
if
(
*
next
!=
NULL
)
{
return
;
}
*
next
=
calloc
(
1
,
pFillInfo
->
rowSize
);
for
(
int
i
=
1
;
i
<
pFillInfo
->
numOfCols
;
i
++
)
{
SFillColInfo
*
pCol
=
&
pFillInfo
->
pFillCol
[
i
];
...
...
@@ -393,9 +180,9 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
while
(
pFillInfo
->
numOfCurrent
<
outputRows
)
{
int64_t
ts
=
((
int64_t
*
)
pFillInfo
->
pData
[
0
])[
pFillInfo
->
index
];
// set the next value for interpolation
if
((
pFillInfo
->
currentKey
<
ts
&&
FILL_IS_ASC_FILL
(
pFillInfo
))
||
(
pFillInfo
->
currentKey
>
ts
&&
!
FILL_IS_ASC_FILL
(
pFillInfo
)))
{
/* set the next value for interpolation */
initBeforeAfterDataBuf
(
pFillInfo
,
next
);
copyCurrentRowIntoBuf
(
pFillInfo
,
srcData
,
*
next
);
}
...
...
@@ -449,7 +236,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
setTagsValue
(
pFillInfo
,
data
,
pFillInfo
->
numOfCurrent
);
pFillInfo
->
currentKey
=
taosTimeAdd
(
pFillInfo
->
currentKey
,
pFillInfo
->
interval
.
sliding
*
step
,
pFillInfo
->
interval
.
slidingUnit
,
pFillInfo
->
precision
);
pFillInfo
->
interval
.
slidingUnit
,
pFillInfo
->
precision
);
pFillInfo
->
index
+=
1
;
pFillInfo
->
numOfCurrent
+=
1
;
}
...
...
@@ -468,7 +255,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
return
pFillInfo
->
numOfCurrent
;
}
static
int64_t
fillExternalResults
(
SFillInfo
*
pFillInfo
,
tFilePage
**
output
,
int64_t
resultCapacity
)
{
static
int64_t
appendFilledResult
(
SFillInfo
*
pFillInfo
,
tFilePage
**
output
,
int64_t
resultCapacity
)
{
/*
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
...
...
@@ -484,15 +271,246 @@ static int64_t fillExternalResults(SFillInfo* pFillInfo, tFilePage** output, int
return
resultCapacity
;
}
// there are no duplicated tags in the SFillTagColInfo list
static
int32_t
setTagColumnInfo
(
SFillInfo
*
pFillInfo
,
int32_t
numOfCols
,
int32_t
capacity
)
{
int32_t
rowsize
=
0
;
int32_t
k
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SFillColInfo
*
pColInfo
=
&
pFillInfo
->
pFillCol
[
i
];
pFillInfo
->
pData
[
i
]
=
calloc
(
1
,
pColInfo
->
col
.
bytes
*
capacity
);
if
(
TSDB_COL_IS_TAG
(
pColInfo
->
flag
))
{
bool
exists
=
false
;
int32_t
index
=
-
1
;
for
(
int32_t
j
=
0
;
j
<
k
;
++
j
)
{
if
(
pFillInfo
->
pTags
[
j
].
col
.
colId
==
pColInfo
->
col
.
colId
)
{
exists
=
true
;
index
=
j
;
break
;
}
}
if
(
!
exists
)
{
SSchema
*
pSchema
=
&
pFillInfo
->
pTags
[
k
].
col
;
pSchema
->
colId
=
pColInfo
->
col
.
colId
;
pSchema
->
type
=
pColInfo
->
col
.
type
;
pSchema
->
bytes
=
pColInfo
->
col
.
bytes
;
pFillInfo
->
pTags
[
k
].
tagVal
=
calloc
(
1
,
pColInfo
->
col
.
bytes
);
pColInfo
->
tagIndex
=
k
;
k
+=
1
;
}
else
{
pColInfo
->
tagIndex
=
index
;
}
}
rowsize
+=
pColInfo
->
col
.
bytes
;
}
assert
(
k
<=
pFillInfo
->
numOfTags
);
return
rowsize
;
}
static
int32_t
taosNumOfRemainRows
(
SFillInfo
*
pFillInfo
)
{
if
(
pFillInfo
->
numOfRows
==
0
||
(
pFillInfo
->
numOfRows
>
0
&&
pFillInfo
->
index
>=
pFillInfo
->
numOfRows
))
{
return
0
;
}
return
pFillInfo
->
numOfRows
-
pFillInfo
->
index
;
}
SFillInfo
*
taosInitFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
int64_t
slidingTime
,
int8_t
slidingUnit
,
int8_t
precision
,
int32_t
fillType
,
SFillColInfo
*
pCol
,
void
*
handle
)
{
if
(
fillType
==
TSDB_FILL_NONE
)
{
return
NULL
;
}
SFillInfo
*
pFillInfo
=
calloc
(
1
,
sizeof
(
SFillInfo
));
taosResetFillInfo
(
pFillInfo
,
skey
);
pFillInfo
->
order
=
order
;
pFillInfo
->
type
=
fillType
;
pFillInfo
->
pFillCol
=
pCol
;
pFillInfo
->
numOfTags
=
numOfTags
;
pFillInfo
->
numOfCols
=
numOfCols
;
pFillInfo
->
precision
=
precision
;
pFillInfo
->
alloc
=
capacity
;
pFillInfo
->
handle
=
handle
;
pFillInfo
->
interval
.
interval
=
slidingTime
;
pFillInfo
->
interval
.
intervalUnit
=
slidingUnit
;
pFillInfo
->
interval
.
sliding
=
slidingTime
;
pFillInfo
->
interval
.
slidingUnit
=
slidingUnit
;
pFillInfo
->
pData
=
malloc
(
POINTER_BYTES
*
numOfCols
);
if
(
numOfTags
>
0
)
{
pFillInfo
->
pTags
=
calloc
(
pFillInfo
->
numOfTags
,
sizeof
(
SFillTagColInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
pFillInfo
->
pTags
[
i
].
col
.
colId
=
-
2
;
// TODO
}
}
pFillInfo
->
rowSize
=
setTagColumnInfo
(
pFillInfo
,
pFillInfo
->
numOfCols
,
pFillInfo
->
alloc
);
assert
(
pFillInfo
->
rowSize
>
0
);
return
pFillInfo
;
}
void
taosResetFillInfo
(
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
)
{
pFillInfo
->
start
=
startTimestamp
;
pFillInfo
->
currentKey
=
startTimestamp
;
pFillInfo
->
index
=
-
1
;
pFillInfo
->
numOfRows
=
0
;
pFillInfo
->
numOfCurrent
=
0
;
pFillInfo
->
numOfTotal
=
0
;
}
void
*
taosDestroyFillInfo
(
SFillInfo
*
pFillInfo
)
{
if
(
pFillInfo
==
NULL
)
{
return
NULL
;
}
tfree
(
pFillInfo
->
prevValues
);
tfree
(
pFillInfo
->
nextValues
);
tfree
(
pFillInfo
->
pTags
);
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
tfree
(
pFillInfo
->
pData
[
i
]);
}
tfree
(
pFillInfo
->
pData
);
tfree
(
pFillInfo
->
pFillCol
);
tfree
(
pFillInfo
);
return
NULL
;
}
void
taosFillSetStartInfo
(
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
)
{
if
(
pFillInfo
->
type
==
TSDB_FILL_NONE
)
{
return
;
}
pFillInfo
->
end
=
endKey
;
if
(
!
FILL_IS_ASC_FILL
(
pFillInfo
))
{
pFillInfo
->
end
=
taosTimeTruncate
(
endKey
,
&
pFillInfo
->
interval
,
pFillInfo
->
precision
);
}
pFillInfo
->
index
=
0
;
pFillInfo
->
numOfRows
=
numOfRows
;
// ensure the space
if
(
pFillInfo
->
alloc
<
numOfRows
)
{
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
char
*
tmp
=
realloc
(
pFillInfo
->
pData
[
i
],
numOfRows
*
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
);
assert
(
tmp
!=
NULL
);
// todo handle error
memset
(
tmp
,
0
,
numOfRows
*
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
);
pFillInfo
->
pData
[
i
]
=
tmp
;
}
}
}
// copy the data into source data buffer
void
taosFillCopyInputDataFromFilePage
(
SFillInfo
*
pFillInfo
,
const
tFilePage
**
pInput
)
{
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
memcpy
(
pFillInfo
->
pData
[
i
],
pInput
[
i
]
->
data
,
pFillInfo
->
numOfRows
*
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
);
}
}
void
taosFillCopyInputDataFromOneFilePage
(
SFillInfo
*
pFillInfo
,
const
tFilePage
*
pInput
)
{
assert
(
pFillInfo
->
numOfRows
==
pInput
->
num
);
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
SFillColInfo
*
pCol
=
&
pFillInfo
->
pFillCol
[
i
];
const
char
*
data
=
pInput
->
data
+
pCol
->
col
.
offset
*
pInput
->
num
;
memcpy
(
pFillInfo
->
pData
[
i
],
data
,
(
size_t
)(
pInput
->
num
*
pCol
->
col
.
bytes
));
if
(
TSDB_COL_IS_TAG
(
pCol
->
flag
))
{
// copy the tag value to tag value buffer
SFillTagColInfo
*
pTag
=
&
pFillInfo
->
pTags
[
pCol
->
tagIndex
];
assert
(
pTag
->
col
.
colId
==
pCol
->
col
.
colId
);
memcpy
(
pTag
->
tagVal
,
data
,
pCol
->
col
.
bytes
);
}
}
}
bool
taosFillHasMoreResults
(
SFillInfo
*
pFillInfo
)
{
return
taosNumOfRemainRows
(
pFillInfo
)
>
0
;
}
int64_t
getNumOfResultsAfterFillGap
(
SFillInfo
*
pFillInfo
,
TSKEY
ekey
,
int32_t
maxNumOfRows
)
{
int64_t
*
tsList
=
(
int64_t
*
)
pFillInfo
->
pData
[
0
];
int32_t
numOfRows
=
taosNumOfRemainRows
(
pFillInfo
);
TSKEY
ekey1
=
ekey
;
if
(
!
FILL_IS_ASC_FILL
(
pFillInfo
))
{
pFillInfo
->
end
=
taosTimeTruncate
(
ekey
,
&
pFillInfo
->
interval
,
pFillInfo
->
precision
);
}
int64_t
numOfRes
=
-
1
;
if
(
numOfRows
>
0
)
{
// still fill gap within current data block, not generating data after the result set.
TSKEY
lastKey
=
tsList
[
pFillInfo
->
numOfRows
-
1
];
numOfRes
=
taosTimeCountInterval
(
lastKey
,
pFillInfo
->
currentKey
,
pFillInfo
->
interval
.
sliding
,
pFillInfo
->
interval
.
slidingUnit
,
pFillInfo
->
precision
);
numOfRes
+=
1
;
assert
(
numOfRes
>=
numOfRows
);
}
else
{
// reach the end of data
if
((
ekey1
<
pFillInfo
->
currentKey
&&
FILL_IS_ASC_FILL
(
pFillInfo
))
||
(
ekey1
>
pFillInfo
->
currentKey
&&
!
FILL_IS_ASC_FILL
(
pFillInfo
)))
{
return
0
;
}
numOfRes
=
taosTimeCountInterval
(
ekey1
,
pFillInfo
->
currentKey
,
pFillInfo
->
interval
.
sliding
,
pFillInfo
->
interval
.
slidingUnit
,
pFillInfo
->
precision
);
numOfRes
+=
1
;
}
return
(
numOfRes
>
maxNumOfRows
)
?
maxNumOfRows
:
numOfRes
;
}
int32_t
taosGetLinearInterpolationVal
(
int32_t
type
,
SPoint
*
point1
,
SPoint
*
point2
,
SPoint
*
point
)
{
double
v1
=
-
1
;
double
v2
=
-
1
;
GET_TYPED_DATA
(
v1
,
double
,
type
,
point1
->
val
);
GET_TYPED_DATA
(
v2
,
double
,
type
,
point2
->
val
);
double
r
=
DO_INTERPOLATION
(
v1
,
v2
,
point1
->
key
,
point2
->
key
,
point
->
key
);
switch
(
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
*
(
int8_t
*
)
point
->
val
=
(
int8_t
)
r
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
*
(
int16_t
*
)
point
->
val
=
(
int16_t
)
r
;
break
;
case
TSDB_DATA_TYPE_INT
:
*
(
int32_t
*
)
point
->
val
=
(
int32_t
)
r
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
*
(
int64_t
*
)
point
->
val
=
(
int64_t
)
r
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
*
(
double
*
)
point
->
val
=
(
double
)
r
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
*
(
float
*
)
point
->
val
=
(
float
)
r
;
break
;
default:
assert
(
0
);
}
return
TSDB_CODE_SUCCESS
;
}
int64_t
taosFillResultDataBlock
(
SFillInfo
*
pFillInfo
,
tFilePage
**
output
,
int32_t
capacity
)
{
int32_t
remain
=
taosNumOfRemainRows
(
pFillInfo
);
int64_t
numOfRes
=
getNumOfRes
WithFill
(
pFillInfo
,
pFillInfo
->
end
,
capacity
);
int64_t
numOfRes
=
getNumOfRes
ultsAfterFillGap
(
pFillInfo
,
pFillInfo
->
end
,
capacity
);
assert
(
numOfRes
<=
capacity
);
// no data existed for fill operation now, append result according to the fill strategy
if
(
remain
==
0
)
{
fillExternalResults
(
pFillInfo
,
output
,
numOfRes
);
appendFilledResult
(
pFillInfo
,
output
,
numOfRes
);
}
else
{
fillResultImpl
(
pFillInfo
,
output
,
(
int32_t
)
numOfRes
);
assert
(
numOfRes
==
pFillInfo
->
numOfCurrent
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录