Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
efd28728
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
efd28728
编写于
6月 15, 2022
作者:
H
Haojun Liao
提交者:
GitHub
6月 15, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13779 from taosdata/feature/3_liaohj
enh(query): add derivative function.
上级
39ac85b1
4f8c84cf
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
601 addition
and
746 deletion
+601
-746
docs/examples/.gitignre
docs/examples/.gitignre
+2
-0
include/libs/function/function.h
include/libs/function/function.h
+0
-1
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+2
-0
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-4
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+14
-11
source/libs/executor/inc/tfill.h
source/libs/executor/inc/tfill.h
+0
-2
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+0
-246
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+12
-10
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+71
-108
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+7
-11
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+216
-54
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+4
-0
source/libs/function/inc/functionMgtInt.h
source/libs/function/inc/functionMgtInt.h
+2
-1
source/libs/function/inc/texpr.h
source/libs/function/inc/texpr.h
+0
-6
source/libs/function/inc/tunaryoperator.h
source/libs/function/inc/tunaryoperator.h
+0
-32
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+56
-2
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+198
-6
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+2
-0
source/libs/function/src/taggfunction.c
source/libs/function/src/taggfunction.c
+7
-7
source/libs/function/src/texpr.c
source/libs/function/src/texpr.c
+0
-239
source/util/src/tcache.c
source/util/src/tcache.c
+4
-4
tests/system-test/2-query/json_tag.py
tests/system-test/2-query/json_tag.py
+1
-1
tools/taosadapter
tools/taosadapter
+0
-1
未找到文件。
docs/examples/.gitignre
0 → 100644
浏览文件 @
efd28728
.vscode
*.lock
\ No newline at end of file
include/libs/function/function.h
浏览文件 @
efd28728
...
@@ -132,7 +132,6 @@ typedef struct SqlFunctionCtx {
...
@@ -132,7 +132,6 @@ typedef struct SqlFunctionCtx {
char
*
pOutput
;
// final result output buffer, point to sdata->data
char
*
pOutput
;
// final result output buffer, point to sdata->data
int32_t
numOfParams
;
int32_t
numOfParams
;
SFunctParam
*
param
;
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param
SFunctParam
*
param
;
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t
*
ptsList
;
// corresponding timestamp array list, todo remove it
SColumnInfoData
*
pTsOutput
;
// corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SColumnInfoData
*
pTsOutput
;
// corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t
offset
;
int32_t
offset
;
struct
SResultRowEntryInfo
*
resultInfo
;
struct
SResultRowEntryInfo
*
resultInfo
;
...
...
include/libs/function/functionMgt.h
浏览文件 @
efd28728
...
@@ -134,6 +134,7 @@ typedef enum EFunctionType {
...
@@ -134,6 +134,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_HYPERLOGLOG_MERGE
,
FUNCTION_TYPE_HYPERLOGLOG_MERGE
,
FUNCTION_TYPE_ELAPSED_PARTIAL
,
FUNCTION_TYPE_ELAPSED_PARTIAL
,
FUNCTION_TYPE_ELAPSED_MERGE
,
FUNCTION_TYPE_ELAPSED_MERGE
,
FUNCTION_TYPE_TOP_PARTIAL
,
FUNCTION_TYPE_TOP_PARTIAL
,
FUNCTION_TYPE_TOP_MERGE
,
FUNCTION_TYPE_TOP_MERGE
,
FUNCTION_TYPE_BOTTOM_PARTIAL
,
FUNCTION_TYPE_BOTTOM_PARTIAL
,
...
@@ -184,6 +185,7 @@ bool fmIsUserDefinedFunc(int32_t funcId);
...
@@ -184,6 +185,7 @@ bool fmIsUserDefinedFunc(int32_t funcId);
bool
fmIsDistExecFunc
(
int32_t
funcId
);
bool
fmIsDistExecFunc
(
int32_t
funcId
);
bool
fmIsForbidFillFunc
(
int32_t
funcId
);
bool
fmIsForbidFillFunc
(
int32_t
funcId
);
bool
fmIsForbidStreamFunc
(
int32_t
funcId
);
bool
fmIsForbidStreamFunc
(
int32_t
funcId
);
bool
fmIsIntervalInterpoFunc
(
int32_t
funcId
);
int32_t
fmGetDistMethod
(
const
SFunctionNode
*
pFunc
,
SFunctionNode
**
pPartialFunc
,
SFunctionNode
**
pMergeFunc
);
int32_t
fmGetDistMethod
(
const
SFunctionNode
*
pFunc
,
SFunctionNode
**
pPartialFunc
,
SFunctionNode
**
pMergeFunc
);
...
...
source/libs/executor/inc/executil.h
浏览文件 @
efd28728
...
@@ -44,7 +44,6 @@
...
@@ -44,7 +44,6 @@
typedef
struct
SGroupResInfo
{
typedef
struct
SGroupResInfo
{
int32_t
index
;
int32_t
index
;
SArray
*
pRows
;
// SArray<SResKeyPos>
SArray
*
pRows
;
// SArray<SResKeyPos>
int32_t
position
;
}
SGroupResInfo
;
}
SGroupResInfo
;
typedef
struct
SResultRow
{
typedef
struct
SResultRow
{
...
@@ -56,7 +55,7 @@ typedef struct SResultRow {
...
@@ -56,7 +55,7 @@ typedef struct SResultRow {
uint32_t
numOfRows
;
// number of rows of current time window
uint32_t
numOfRows
;
// number of rows of current time window
STimeWindow
win
;
STimeWindow
win
;
struct
SResultRowEntryInfo
pEntryInfo
[];
// For each result column, there is a resultInfo
struct
SResultRowEntryInfo
pEntryInfo
[];
// For each result column, there is a resultInfo
// char *key;
// start key of current result row
// char *key; // start key of current result row
}
SResultRow
;
}
SResultRow
;
typedef
struct
SResultRowPosition
{
typedef
struct
SResultRowPosition
{
...
@@ -71,9 +70,7 @@ typedef struct SResKeyPos {
...
@@ -71,9 +70,7 @@ typedef struct SResKeyPos {
}
SResKeyPos
;
}
SResKeyPos
;
typedef
struct
SResultRowInfo
{
typedef
struct
SResultRowInfo
{
SResultRowPosition
*
pPosition
;
// todo remove this
int32_t
size
;
// number of result set
int32_t
size
;
// number of result set
int32_t
capacity
;
// max capacity
SResultRowPosition
cur
;
SResultRowPosition
cur
;
SList
*
openWindow
;
SList
*
openWindow
;
}
SResultRowInfo
;
}
SResultRowInfo
;
...
...
source/libs/executor/inc/executorInt.h
浏览文件 @
efd28728
...
@@ -20,6 +20,8 @@
...
@@ -20,6 +20,8 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
extern
int32_t
exchangeObjRefPool
;
typedef
struct
{
typedef
struct
{
char
*
pData
;
char
*
pData
;
bool
isNull
;
bool
isNull
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
efd28728
...
@@ -181,7 +181,6 @@ typedef struct SExecTaskInfo {
...
@@ -181,7 +181,6 @@ typedef struct SExecTaskInfo {
STaskCostInfo
cost
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int64_t
owner
;
// if it is in execution
int32_t
code
;
int32_t
code
;
// uint64_t totalRows; // total number of rows
struct
{
struct
{
char
*
tablename
;
char
*
tablename
;
char
*
dbname
;
char
*
dbname
;
...
@@ -222,10 +221,10 @@ typedef struct STaskRuntimeEnv {
...
@@ -222,10 +221,10 @@ typedef struct STaskRuntimeEnv {
}
STaskRuntimeEnv
;
}
STaskRuntimeEnv
;
enum
{
enum
{
OP_NOT_OPENED
=
0x0
,
OP_NOT_OPENED
=
0x0
,
OP_OPENED
=
0x1
,
OP_OPENED
=
0x1
,
OP_RES_TO_RETURN
=
0x5
,
OP_RES_TO_RETURN
=
0x5
,
OP_EXEC_DONE
=
0x9
,
OP_EXEC_DONE
=
0x9
,
};
};
typedef
struct
SOperatorFpSet
{
typedef
struct
SOperatorFpSet
{
...
@@ -262,12 +261,12 @@ typedef enum {
...
@@ -262,12 +261,12 @@ typedef enum {
}
EX_SOURCE_STATUS
;
}
EX_SOURCE_STATUS
;
typedef
struct
SSourceDataInfo
{
typedef
struct
SSourceDataInfo
{
struct
SExchangeInfo
*
pEx
;
int32_t
index
;
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
uint64_t
totalRows
;
int32_t
code
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
EX_SOURCE_STATUS
status
;
const
char
*
taskId
;
}
SSourceDataInfo
;
}
SSourceDataInfo
;
typedef
struct
SLoadRemoteDataInfo
{
typedef
struct
SLoadRemoteDataInfo
{
...
@@ -285,6 +284,7 @@ typedef struct SExchangeInfo {
...
@@ -285,6 +284,7 @@ typedef struct SExchangeInfo {
bool
seqLoadData
;
// sequential load data or not, false by default
bool
seqLoadData
;
// sequential load data or not, false by default
int32_t
current
;
int32_t
current
;
SLoadRemoteDataInfo
loadInfo
;
SLoadRemoteDataInfo
loadInfo
;
uint64_t
self
;
}
SExchangeInfo
;
}
SExchangeInfo
;
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_COL_ID 0x1
...
@@ -470,10 +470,8 @@ typedef struct SIntervalAggOperatorInfo {
...
@@ -470,10 +470,8 @@ typedef struct SIntervalAggOperatorInfo {
bool
timeWindowInterpo
;
// interpolation needed or not
bool
timeWindowInterpo
;
// interpolation needed or not
char
**
pRow
;
// previous row/tuple of already processed datablock
char
**
pRow
;
// previous row/tuple of already processed datablock
SArray
*
pInterpCols
;
// interpolation columns
SArray
*
pInterpCols
;
// interpolation columns
STableQueryInfo
*
pCurrent
;
// current tableQueryInfo struct
int32_t
order
;
// current SSDataBlock scan order
int32_t
order
;
// current SSDataBlock scan order
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
SArray
*
pUpdatedWindow
;
// updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp
twAggSup
;
STimeWindowAggSupp
twAggSup
;
bool
invertible
;
bool
invertible
;
SArray
*
pPrevValues
;
// SArray<SGroupKeys> used to keep the previous not null value for interpolation.
SArray
*
pPrevValues
;
// SArray<SGroupKeys> used to keep the previous not null value for interpolation.
...
@@ -502,8 +500,6 @@ typedef struct SAggOperatorInfo {
...
@@ -502,8 +500,6 @@ typedef struct SAggOperatorInfo {
STableQueryInfo
*
current
;
STableQueryInfo
*
current
;
uint64_t
groupId
;
uint64_t
groupId
;
SGroupResInfo
groupResInfo
;
SGroupResInfo
groupResInfo
;
STableQueryInfo
*
pTableQueryInfo
;
SExprInfo
*
pScalarExprInfo
;
SExprInfo
*
pScalarExprInfo
;
int32_t
numOfScalarExpr
;
// the number of scalar expression before the aggregate function can be applied
int32_t
numOfScalarExpr
;
// the number of scalar expression before the aggregate function can be applied
SqlFunctionCtx
*
pScalarCtx
;
// scalar function requried sql function struct.
SqlFunctionCtx
*
pScalarCtx
;
// scalar function requried sql function struct.
...
@@ -639,8 +635,13 @@ typedef struct SStreamSessionAggOperatorInfo {
...
@@ -639,8 +635,13 @@ typedef struct SStreamSessionAggOperatorInfo {
typedef
struct
STimeSliceOperatorInfo
{
typedef
struct
STimeSliceOperatorInfo
{
SOptrBasicInfo
binfo
;
SOptrBasicInfo
binfo
;
STimeWindow
win
;
SInterval
interval
;
SInterval
interval
;
SGroupResInfo
groupResInfo
;
// multiple results build supporter
int64_t
current
;
SArray
*
pPrevRow
;
// SArray<SGroupValue>
SArray
*
pCols
;
// SArray<SColumn>
int32_t
fillType
;
// fill type
struct
SFillColInfo
*
pFillColInfo
;
// fill column info
}
STimeSliceOperatorInfo
;
}
STimeSliceOperatorInfo
;
typedef
struct
SStateWindowOperatorInfo
{
typedef
struct
SStateWindowOperatorInfo
{
...
@@ -733,6 +734,8 @@ typedef struct SJoinOperatorInfo {
...
@@ -733,6 +734,8 @@ typedef struct SJoinOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
void
doDestroyExchangeOperatorInfo
(
void
*
param
);
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
streamFn
,
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
streamFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_encode_fn_t
encode
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_encode_fn_t
encode
,
__optr_decode_fn_t
decode
,
__optr_explain_fn_t
explain
);
__optr_decode_fn_t
decode
,
__optr_explain_fn_t
explain
);
...
@@ -840,7 +843,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
...
@@ -840,7 +843,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
);
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExecTaskInfo
*
pTaskInfo
);
SSDataBlock
*
pResultBlock
,
const
SNodeListNode
*
pValNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SNode
*
pOnCondition
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SNode
*
pOnCondition
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/inc/tfill.h
浏览文件 @
efd28728
...
@@ -28,8 +28,6 @@ struct SSDataBlock;
...
@@ -28,8 +28,6 @@ struct SSDataBlock;
typedef
struct
SFillColInfo
{
typedef
struct
SFillColInfo
{
SExprInfo
*
pExpr
;
SExprInfo
*
pExpr
;
// SResSchema schema;
// int16_t functionId; // sql function id
int16_t
flag
;
// column flag: TAG COLUMN|NORMAL COLUMN
int16_t
flag
;
// column flag: TAG COLUMN|NORMAL COLUMN
int16_t
tagIndex
;
// index of current tag in SFillTagColInfo array list
int16_t
tagIndex
;
// index of current tag in SFillTagColInfo array list
SVariant
fillVal
;
SVariant
fillVal
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
efd28728
...
@@ -41,13 +41,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
...
@@ -41,13 +41,7 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
)
{
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
)
{
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
capacity
=
size
;
pResultRowInfo
->
cur
.
pageId
=
-
1
;
pResultRowInfo
->
cur
.
pageId
=
-
1
;
pResultRowInfo
->
pPosition
=
taosMemoryCalloc
(
pResultRowInfo
->
capacity
,
sizeof
(
SResultRowPosition
));
if
(
pResultRowInfo
->
pPosition
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -56,25 +50,14 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
...
@@ -56,25 +50,14 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
return
;
return
;
}
}
if
(
pResultRowInfo
->
capacity
==
0
)
{
// assert(pResultRowInfo->pResult == NULL);
return
;
}
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
// if (pResultRowInfo->pResult[i]) {
// if (pResultRowInfo->pResult[i]) {
// taosMemoryFreeClear(pResultRowInfo->pResult[i]->key);
// taosMemoryFreeClear(pResultRowInfo->pResult[i]->key);
// }
// }
}
}
taosMemoryFreeClear
(
pResultRowInfo
->
pPosition
);
}
}
void
resetResultRowInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
void
resetResultRowInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
// SResultRow *pWindowRes = pResultRowInfo->pResult[i];
// SResultRow *pWindowRes = pResultRowInfo->pResult[i];
// clearResultRow(pRuntimeEnv, pWindowRes);
// clearResultRow(pRuntimeEnv, pWindowRes);
...
@@ -288,232 +271,3 @@ void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) {
...
@@ -288,232 +271,3 @@ void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) {
taosArraySort
(
pRuntimeEnv
->
pResultRowArrayList
,
fn
);
taosArraySort
(
pRuntimeEnv
->
pResultRowArrayList
,
fn
);
}
}
static
int32_t
mergeIntoGroupResultImplRv
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
uint64_t
groupId
,
int32_t
*
rowCellInfoOffset
)
{
if
(
pGroupResInfo
->
pRows
==
NULL
)
{
pGroupResInfo
->
pRows
=
taosArrayInit
(
100
,
POINTER_BYTES
);
}
size_t
len
=
taosArrayGetSize
(
pRuntimeEnv
->
pResultRowArrayList
);
for
(;
pGroupResInfo
->
position
<
len
;
++
pGroupResInfo
->
position
)
{
SResultRowCell
*
pResultRowCell
=
taosArrayGet
(
pRuntimeEnv
->
pResultRowArrayList
,
pGroupResInfo
->
position
);
if
(
pResultRowCell
->
groupId
!=
groupId
)
{
break
;
}
int64_t
num
=
0
;
//getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset);
if
(
num
<=
0
)
{
continue
;
}
taosArrayPush
(
pGroupResInfo
->
pRows
,
&
pResultRowCell
->
pos
);
// pResultRowCell->pRow->numOfRows = (uint32_t) num;
}
return
TSDB_CODE_SUCCESS
;
}
static
UNUSED_FUNC
int32_t
mergeIntoGroupResultImpl
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pTableList
,
int32_t
*
rowCellInfoOffset
)
{
bool
ascQuery
=
true
;
#if 0
int32_t code = TSDB_CODE_SUCCESS;
int32_t *posList = NULL;
SMultiwayMergeTreeInfo *pTree = NULL;
STableQueryInfo **pTableQueryInfoList = NULL;
size_t size = taosArrayGetSize(pTableList);
if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
}
posList = taosMemoryCalloc(size, sizeof(int32_t));
pTableQueryInfoList = taosMemoryMalloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
// qError("QInfo:%"PRIu64" failed alloc memory", GET_TASKID(pRuntimeEnv));
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pTableList, i);
// if (item->resInfo.size > 0) {
// pTableQueryInfoList[numOfTables++] = item;
// }
}
// there is no data in current group
// no need to merge results since only one table in each group
// if (numOfTables == 0) {
// goto _end;
// }
int32_t order = TSDB_ORDER_ASC;
SCompSupporter cs = {pTableQueryInfoList, posList, order};
int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX;
int64_t startt = taosGetTimestampMs();
while (1) {
int32_t tableIndex = tMergeTreeGetChosenIndex(pTree);
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo;
ASSERT(0);
SResultRow *pWindowRes = NULL;//getResultRow(pBuf, pWindowResInfo, cs.rowIndex[tableIndex]);
int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset);
if (num <= 0) {
cs.rowIndex[tableIndex] += 1;
if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
if (--numOfTables == 0) { // all input sources are exhausted
break;
}
}
} else {
assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery));
if (pWindowRes->win.skey != lastTimestamp) {
taosArrayPush(pGroupResInfo->pRows, &pWindowRes);
pWindowRes->numOfRows = (uint32_t) num;
}
lastTimestamp = pWindowRes->win.skey;
// move to the next row of current entry
if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
// all input sources are exhausted
if ((--numOfTables) == 0) {
break;
}
}
}
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}
int64_t endt = taosGetTimestampMs();
// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, endt - startt);
_end:
taosMemoryFreeClear(pTableQueryInfoList);
taosMemoryFreeClear(posList);
taosMemoryFreeClear(pTree);
return code;
}
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
mergeIntoGroupResultImplRv(pRuntimeEnv, pGroupResInfo, pGroupResInfo->currentGroup, offset);
// this group generates at least one result, return results
if (taosArrayGetSize(pGroupResInfo->pRows) > 0) {
break;
}
// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_TASKID(pRuntimeEnv), pGroupResInfo->currentGroup);
cleanupGroupResInfo(pGroupResInfo);
incNextGroup(pGroupResInfo);
}
// int64_t elapsedTime = taosGetTimestampUs() - st;
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
#endif
return
TSDB_CODE_SUCCESS
;
}
//void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw) {
// tbufWriteUint32(bw, pDist->numOfTables);
// tbufWriteUint16(bw, pDist->numOfFiles);
// tbufWriteUint64(bw, pDist->totalSize);
// tbufWriteUint64(bw, pDist->totalRows);
// tbufWriteInt32(bw, pDist->maxRows);
// tbufWriteInt32(bw, pDist->minRows);
// tbufWriteUint32(bw, pDist->numOfInmemRows);
// tbufWriteUint32(bw, pDist->numOfSmallBlocks);
// tbufWriteUint64(bw, taosArrayGetSize(pDist->dataBlockInfos));
//
// // compress the binary string
// char* p = TARRAY_GET_START(pDist->dataBlockInfos);
//
// // compress extra bytes
// size_t x = taosArrayGetSize(pDist->dataBlockInfos) * pDist->dataBlockInfos->elemSize;
// char* tmp = taosMemoryMalloc(x + 2);
//
// bool comp = false;
// int32_t len = tsCompressString(p, (int32_t)x, 1, tmp, (int32_t)x, ONE_STAGE_COMP, NULL, 0);
// if (len == -1 || len >= x) { // compress failed, do not compress this binary data
// comp = false;
// len = (int32_t)x;
// } else {
// comp = true;
// }
//
// tbufWriteUint8(bw, comp);
// tbufWriteUint32(bw, len);
// if (comp) {
// tbufWriteBinary(bw, tmp, len);
// } else {
// tbufWriteBinary(bw, p, len);
// }
// taosMemoryFreeClear(tmp);
//}
//void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) {
// SBufferReader br = tbufInitReader(data, len, false);
//
// pDist->numOfTables = tbufReadUint32(&br);
// pDist->numOfFiles = tbufReadUint16(&br);
// pDist->totalSize = tbufReadUint64(&br);
// pDist->totalRows = tbufReadUint64(&br);
// pDist->maxRows = tbufReadInt32(&br);
// pDist->minRows = tbufReadInt32(&br);
// pDist->numOfInmemRows = tbufReadUint32(&br);
// pDist->numOfSmallBlocks = tbufReadUint32(&br);
// int64_t numSteps = tbufReadUint64(&br);
//
// bool comp = tbufReadUint8(&br);
// uint32_t compLen = tbufReadUint32(&br);
//
// size_t originalLen = (size_t) (numSteps *sizeof(SFileBlockInfo));
//
// char* outputBuf = NULL;
// if (comp) {
// outputBuf = taosMemoryMalloc(originalLen);
//
// size_t actualLen = compLen;
// const char* compStr = tbufReadBinary(&br, &actualLen);
//
// int32_t orignalLen = tsDecompressString(compStr, compLen, 1, outputBuf,
// (int32_t)originalLen , ONE_STAGE_COMP, NULL, 0);
// assert(orignalLen == numSteps *sizeof(SFileBlockInfo));
// } else {
// outputBuf = (char*) tbufReadBinary(&br, &originalLen);
// }
//
// pDist->dataBlockInfos = taosArrayFromList(outputBuf, (uint32_t)numSteps, sizeof(SFileBlockInfo));
// if (comp) {
// taosMemoryFreeClear(outputBuf);
// }
//}
source/libs/executor/src/executorMain.c
浏览文件 @
efd28728
...
@@ -13,28 +13,30 @@
...
@@ -13,28 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <vnode.h>
#include "dataSinkMgt.h"
#include "texception.h"
#include "os.h"
#include "os.h"
#include "tarray.h"
#include "tref.h"
#include "tcache.h"
#include "dataSinkMgt.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tmsg.h"
#include "tudf.h"
#include "tudf.h"
#include "executor.h"
#include "executor.h"
#include "executorimpl.h"
#include "executorimpl.h"
#include "query.h"
#include "query.h"
#include "thash.h"
#include "tlosertree.h"
static
TdThreadOnce
initPoolOnce
=
PTHREAD_ONCE_INIT
;
#include "ttypes.h"
int32_t
exchangeObjRefPool
=
-
1
;
static
void
initRefPool
()
{
exchangeObjRefPool
=
taosOpenRef
(
1024
,
doDestroyExchangeOperatorInfo
);
}
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
EOPTR_EXEC_MODEL
model
)
{
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
EOPTR_EXEC_MODEL
model
)
{
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
taosThreadOnce
(
&
initPoolOnce
,
initRefPool
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
model
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
model
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
efd28728
...
@@ -13,6 +13,7 @@
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "tref.h"
#include "filter.h"
#include "filter.h"
#include "function.h"
#include "function.h"
#include "functionMgt.h"
#include "functionMgt.h"
...
@@ -37,9 +38,6 @@
...
@@ -37,9 +38,6 @@
#include "vnode.h"
#include "vnode.h"
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN)
#define SET_MAIN_SCAN_FLAG(runtime) ((runtime)->scanFlag = MAIN_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define SDATA_BLOCK_INITIALIZER \
#define SDATA_BLOCK_INITIALIZER \
...
@@ -47,12 +45,6 @@
...
@@ -47,12 +45,6 @@
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
enum
{
TS_JOIN_TS_EQUAL
=
0
,
TS_JOIN_TS_NOT_EQUALS
=
1
,
TS_JOIN_TAG_NOT_EQUALS
=
2
,
};
#if 0
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = taosRand();
uint32_t v = taosRand();
...
@@ -88,7 +80,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
...
@@ -88,7 +80,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#endif
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
//#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
int32_t
getMaximumIdleDurationSec
()
{
return
tsShellActivityTimer
*
2
;
}
int32_t
getMaximumIdleDurationSec
()
{
return
tsShellActivityTimer
*
2
;
}
...
@@ -110,7 +101,6 @@ static void releaseQueryBuf(size_t numOfTables);
...
@@ -110,7 +101,6 @@ static void releaseQueryBuf(size_t numOfTables);
static
void
destroySFillOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroySFillOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyProjectOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyProjectOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyTagScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyOrderOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyAggOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyAggOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
...
@@ -118,7 +108,6 @@ static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
...
@@ -118,7 +108,6 @@ static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
static
void
destroyExchangeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyExchangeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
static
void
destroyOperatorInfo
(
SOperatorInfo
*
pOperator
);
static
void
destroyOperatorInfo
(
SOperatorInfo
*
pOperator
);
static
void
destroySysTableScannerOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
)
{
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
pOperator
->
status
=
OP_EXEC_DONE
;
...
@@ -562,10 +551,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
...
@@ -562,10 +551,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
pCtx
[
k
].
input
.
startRowIndex
=
offset
;
pCtx
[
k
].
input
.
startRowIndex
=
offset
;
pCtx
[
k
].
input
.
numOfRows
=
forwardStep
;
pCtx
[
k
].
input
.
numOfRows
=
forwardStep
;
if
(
tsCol
!=
NULL
)
{
pCtx
[
k
].
ptsList
=
tsCol
;
}
// not a whole block involved in query processing, statistics data can not be used
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
// NOTE: the original value of isSet have been changed here
if
(
pCtx
[
k
].
input
.
colDataAggIsSet
&&
forwardStep
<
numOfTotal
)
{
if
(
pCtx
[
k
].
input
.
colDataAggIsSet
&&
forwardStep
<
numOfTotal
)
{
...
@@ -1133,40 +1118,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
...
@@ -1133,40 +1118,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx
->
increase
=
false
;
pCtx
->
increase
=
false
;
pCtx
->
param
=
pFunct
->
pParam
;
pCtx
->
param
=
pFunct
->
pParam
;
// for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// // set the order information for top/bottom query
// int32_t functionId = pCtx->functionId;
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
// int32_t f = getExprFunctionId(&pExpr[0]);
// assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY);
//
// // pCtx->param[2].i = pQueryAttr->order.order;
// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
// // pCtx->param[3].i = functionId;
// // pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
//
// // pCtx->param[1].i = pQueryAttr->order.col.info.colId;
// } else if (functionId == FUNCTION_INTERP) {
// // pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
// // if (pQueryAttr->fillVal != NULL) {
// // if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
// // pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
// // } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
// // if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
// // taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i],
// pCtx->inputBytes, pCtx->inputType);
// // }
// // }
// // }
// } else if (functionId == FUNCTION_TWA) {
// // pCtx->param[1].i = pQueryAttr->window.skey;
// // pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
// // pCtx->param[2].i = pQueryAttr->window.ekey;
// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
// } else if (functionId == FUNCTION_ARITHM) {
// // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
// }
// }
}
}
for
(
int32_t
i
=
1
;
i
<
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
1
;
i
<
numOfOutput
;
++
i
)
{
...
@@ -2438,44 +2389,44 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
...
@@ -2438,44 +2389,44 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
#endif
#endif
}
}
// static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBlock, SHashObj* pTableIdInfo, int32_t
typedef
struct
SFetchRspHandleWrapper
{
// order) {
uint32_t
exchangeId
;
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
int32_t
sourceIndex
;
// pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step;
}
SFetchRspHandleWrapper
;
//
// if (pTableQueryInfo->pTable == NULL) {
// return;
// }
//
// STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo);
// STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
// if (idinfo != NULL) {
// assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid);
// idinfo->key = tidInfo.key;
// } else {
// taosHashPut(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo));
// }
// }
int32_t
loadRemoteDataCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
loadRemoteDataCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSourceDataInfo
*
pSourceDataInfo
=
(
SSourceDataInfo
*
)
param
;
SFetchRspHandleWrapper
*
pWrapper
=
(
SFetchRspHandleWrapper
*
)
param
;
SExchangeInfo
*
pExchangeInfo
=
taosAcquireRef
(
exchangeObjRefPool
,
pWrapper
->
exchangeId
);
if
(
pExchangeInfo
==
NULL
)
{
qWarn
(
"failed to acquire exchange operator, since it may have been released"
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
index
=
pWrapper
->
sourceIndex
;
SSourceDataInfo
*
pSourceDataInfo
=
taosArrayGet
(
pExchangeInfo
->
pSourceDataInfo
,
index
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pSourceDataInfo
->
pRsp
=
pMsg
->
pData
;
pSourceDataInfo
->
pRsp
=
pMsg
->
pData
;
SRetrieveTableRsp
*
pRsp
=
pSourceDataInfo
->
pRsp
;
SRetrieveTableRsp
*
pRsp
=
pSourceDataInfo
->
pRsp
;
pRsp
->
numOfRows
=
htonl
(
pRsp
->
numOfRows
);
pRsp
->
numOfRows
=
htonl
(
pRsp
->
numOfRows
);
pRsp
->
compLen
=
htonl
(
pRsp
->
compLen
);
pRsp
->
compLen
=
htonl
(
pRsp
->
compLen
);
pRsp
->
numOfCols
=
htonl
(
pRsp
->
numOfCols
);
pRsp
->
numOfCols
=
htonl
(
pRsp
->
numOfCols
);
pRsp
->
useconds
=
htobe64
(
pRsp
->
useconds
);
pRsp
->
useconds
=
htobe64
(
pRsp
->
useconds
);
ASSERT
(
p
SourceDataInfo
->
p
Rsp
!=
NULL
);
ASSERT
(
pRsp
!=
NULL
);
qDebug
(
"
fetch rsp received, index:%d, rows:%d"
,
pSourceDataInfo
->
index
,
pRsp
->
numOfRows
);
qDebug
(
"
%s fetch rsp received, index:%d, rows:%d"
,
pSourceDataInfo
->
taskId
,
index
,
pRsp
->
numOfRows
);
}
else
{
}
else
{
pSourceDataInfo
->
code
=
code
;
pSourceDataInfo
->
code
=
code
;
}
}
pSourceDataInfo
->
status
=
EX_SOURCE_DATA_READY
;
pSourceDataInfo
->
status
=
EX_SOURCE_DATA_READY
;
tsem_post
(
&
pSourceDataInfo
->
pEx
->
ready
);
tsem_post
(
&
pExchangeInfo
->
ready
);
taosReleaseRef
(
exchangeObjRefPool
,
pWrapper
->
exchangeId
);
taosMemoryFree
(
pWrapper
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -2524,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
...
@@ -2524,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epSet
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
sourceIndex
,
totalSources
);
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epSet
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
sourceIndex
,
totalSources
);
pMsg
->
header
.
vgId
=
htonl
(
pSource
->
addr
.
nodeId
);
pMsg
->
header
.
vgId
=
htonl
(
pSource
->
addr
.
nodeId
);
pMsg
->
sId
=
htobe64
(
pSource
->
schedId
);
pMsg
->
sId
=
htobe64
(
pSource
->
schedId
);
pMsg
->
taskId
=
htobe64
(
pSource
->
taskId
);
pMsg
->
taskId
=
htobe64
(
pSource
->
taskId
);
pMsg
->
queryId
=
htobe64
(
pTaskInfo
->
id
.
queryId
);
pMsg
->
queryId
=
htobe64
(
pTaskInfo
->
id
.
queryId
);
// send the fetch remote task result reques
// send the fetch remote task result reques
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
...
@@ -2537,11 +2488,15 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
...
@@ -2537,11 +2488,15 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
return
pTaskInfo
->
code
;
return
pTaskInfo
->
code
;
}
}
pMsgSendInfo
->
param
=
pDataInfo
;
SFetchRspHandleWrapper
*
pWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SFetchRspHandleWrapper
));
pWrapper
->
exchangeId
=
pExchangeInfo
->
self
;
pWrapper
->
sourceIndex
=
sourceIndex
;
pMsgSendInfo
->
param
=
pWrapper
;
pMsgSendInfo
->
msgInfo
.
pData
=
pMsg
;
pMsgSendInfo
->
msgInfo
.
pData
=
pMsg
;
pMsgSendInfo
->
msgInfo
.
len
=
sizeof
(
SResFetchReq
);
pMsgSendInfo
->
msgInfo
.
len
=
sizeof
(
SResFetchReq
);
pMsgSendInfo
->
msgType
=
TDMT_VND_FETCH
;
pMsgSendInfo
->
msgType
=
TDMT_VND_FETCH
;
pMsgSendInfo
->
fp
=
loadRemoteDataCallback
;
pMsgSendInfo
->
fp
=
loadRemoteDataCallback
;
int64_t
transporterId
=
0
;
int64_t
transporterId
=
0
;
int32_t
code
=
asyncSendMsgToServer
(
pExchangeInfo
->
pTransporter
,
&
pSource
->
addr
.
epSet
,
&
transporterId
,
pMsgSendInfo
);
int32_t
code
=
asyncSendMsgToServer
(
pExchangeInfo
->
pTransporter
,
&
pSource
->
addr
.
epSet
,
&
transporterId
,
pMsgSendInfo
);
...
@@ -2689,10 +2644,10 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
...
@@ -2689,10 +2644,10 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SSDataBlock
*
pRes
=
pExchangeInfo
->
pResult
;
SSDataBlock
*
pRes
=
pExchangeInfo
->
pResult
;
SLoadRemoteDataInfo
*
pLoadInfo
=
&
pExchangeInfo
->
loadInfo
;
SLoadRemoteDataInfo
*
pLoadInfo
=
&
pExchangeInfo
->
loadInfo
;
if
(
pRsp
->
numOfRows
==
0
)
{
if
(
pRsp
->
numOfRows
==
0
)
{
qDebug
(
"%s vgId:%d, taskI
D
:0x%"
PRIx64
" index:%d completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
qDebug
(
"%s vgId:%d, taskI
d
:0x%"
PRIx64
" index:%d completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
"
try next"
,
"
, completed:%d try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
i
+
1
,
pDataInfo
->
totalRows
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
i
,
pDataInfo
->
totalRows
,
pExchangeInfo
->
loadInfo
.
totalRows
);
pExchangeInfo
->
loadInfo
.
totalRows
,
completed
+
1
,
i
+
1
,
totalSources
);
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
completed
+=
1
;
completed
+=
1
;
taosMemoryFreeClear
(
pDataInfo
->
pRsp
);
taosMemoryFreeClear
(
pDataInfo
->
pRsp
);
...
@@ -2708,10 +2663,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
...
@@ -2708,10 +2663,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
}
if
(
pRsp
->
completed
==
1
)
{
if
(
pRsp
->
completed
==
1
)
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, rowsOfSource:%"
PRIu64
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" index:%d completed, numOfRows:%d, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
" try next %d/%"
PRIzu
,
", totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
", completed:%d try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pRes
->
info
.
rows
,
pDataInfo
->
totalRows
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
i
,
pRes
->
info
.
rows
,
pDataInfo
->
totalRows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
i
+
1
,
totalSources
);
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
completed
+
1
,
i
+
1
,
totalSources
);
completed
+=
1
;
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
}
else
{
}
else
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, totalRows:%"
PRIu64
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, totalRows:%"
PRIu64
...
@@ -2761,13 +2717,13 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
...
@@ -2761,13 +2717,13 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
}
}
int64_t
endTs
=
taosGetTimestampUs
();
int64_t
endTs
=
taosGetTimestampUs
();
qDebug
(
"%s send all fetch requests to %"
PRIzu
" sources completed, elapsed:%
"
PRId64
,
GET_TASKID
(
pTaskInfo
),
qDebug
(
"%s send all fetch requests to %"
PRIzu
" sources completed, elapsed:%
.2fms"
,
GET_TASKID
(
pTaskInfo
),
totalSources
,
endTs
-
startTs
);
totalSources
,
(
endTs
-
startTs
)
/
1000
.
0
);
tsem_wait
(
&
pExchangeInfo
->
ready
);
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
cost
.
openCost
=
taosGetTimestampUs
()
-
startTs
;
pOperator
->
cost
.
openCost
=
taosGetTimestampUs
()
-
startTs
;
tsem_wait
(
&
pExchangeInfo
->
ready
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -2883,7 +2839,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
...
@@ -2883,7 +2839,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
}
}
}
}
static
int32_t
initDataSource
(
int32_t
numOfSources
,
SExchangeInfo
*
pInfo
)
{
static
int32_t
initDataSource
(
int32_t
numOfSources
,
SExchangeInfo
*
pInfo
,
const
char
*
id
)
{
pInfo
->
pSourceDataInfo
=
taosArrayInit
(
numOfSources
,
sizeof
(
SSourceDataInfo
));
pInfo
->
pSourceDataInfo
=
taosArrayInit
(
numOfSources
,
sizeof
(
SSourceDataInfo
));
if
(
pInfo
->
pSourceDataInfo
==
NULL
)
{
if
(
pInfo
->
pSourceDataInfo
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -2892,11 +2848,10 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
...
@@ -2892,11 +2848,10 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
for
(
int32_t
i
=
0
;
i
<
numOfSources
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfSources
;
++
i
)
{
SSourceDataInfo
dataInfo
=
{
0
};
SSourceDataInfo
dataInfo
=
{
0
};
dataInfo
.
status
=
EX_SOURCE_DATA_NOT_READY
;
dataInfo
.
status
=
EX_SOURCE_DATA_NOT_READY
;
dataInfo
.
pEx
=
pInfo
;
dataInfo
.
taskId
=
id
;
dataInfo
.
index
=
i
;
dataInfo
.
index
=
i
;
SSourceDataInfo
*
pDs
=
taosArrayPush
(
pInfo
->
pSourceDataInfo
,
&
dataInfo
);
void
*
ret
=
taosArrayPush
(
pInfo
->
pSourceDataInfo
,
&
dataInfo
);
if
(
pDs
==
NULL
)
{
if
(
ret
==
NULL
)
{
taosArrayDestroy
(
pInfo
->
pSourceDataInfo
);
taosArrayDestroy
(
pInfo
->
pSourceDataInfo
);
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
...
@@ -2924,7 +2879,9 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
...
@@ -2924,7 +2879,9 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
taosArrayPush
(
pInfo
->
pSources
,
pNode
);
taosArrayPush
(
pInfo
->
pSources
,
pNode
);
}
}
return
initDataSource
(
numOfSources
,
pInfo
);
pInfo
->
self
=
taosAddRef
(
exchangeObjRefPool
,
pInfo
);
return
initDataSource
(
numOfSources
,
pInfo
,
id
);
}
}
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
SExchangePhysiNode
*
pExNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
SExchangePhysiNode
*
pExNode
,
SExecTaskInfo
*
pTaskInfo
)
{
...
@@ -2939,18 +2896,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
...
@@ -2939,18 +2896,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto
_error
;
goto
_error
;
}
}
pInfo
->
seqLoadData
=
false
;
pInfo
->
pTransporter
=
pTransporter
;
pInfo
->
pResult
=
createResDataBlock
(
pExNode
->
node
.
pOutputDataBlockDesc
);
tsem_init
(
&
pInfo
->
ready
,
0
,
0
);
tsem_init
(
&
pInfo
->
ready
,
0
,
0
);
pOperator
->
name
=
"ExchangeOperator"
;
pInfo
->
seqLoadData
=
false
;
pInfo
->
pTransporter
=
pTransporter
;
pInfo
->
pResult
=
createResDataBlock
(
pExNode
->
node
.
pOutputDataBlockDesc
);
pOperator
->
name
=
"ExchangeOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
;
pOperator
->
blocking
=
false
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfExprs
=
pInfo
->
pResult
->
info
.
numOfCols
;
pOperator
->
numOfExprs
=
pInfo
->
pResult
->
info
.
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
prepareLoadRemoteData
,
doLoadRemoteData
,
NULL
,
NULL
,
pOperator
->
fpSet
=
createOperatorFpSet
(
prepareLoadRemoteData
,
doLoadRemoteData
,
NULL
,
NULL
,
destroyExchangeOperatorInfo
,
NULL
,
NULL
,
NULL
);
destroyExchangeOperatorInfo
,
NULL
,
NULL
,
NULL
);
...
@@ -2958,7 +2915,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
...
@@ -2958,7 +2915,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
_error:
_error:
if
(
pInfo
!=
NULL
)
{
if
(
pInfo
!=
NULL
)
{
d
estroyExchangeOperatorInfo
(
pInfo
,
LIST_LENGTH
(
pExNode
->
pSrcEndPoints
)
);
d
oDestroyExchangeOperatorInfo
(
pInfo
);
}
}
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pInfo
);
...
@@ -4105,6 +4062,12 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -4105,6 +4062,12 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
void
destroyExchangeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
void
destroyExchangeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SExchangeInfo
*
pExInfo
=
(
SExchangeInfo
*
)
param
;
SExchangeInfo
*
pExInfo
=
(
SExchangeInfo
*
)
param
;
taosRemoveRef
(
exchangeObjRefPool
,
pExInfo
->
self
);
}
void
doDestroyExchangeOperatorInfo
(
void
*
param
)
{
SExchangeInfo
*
pExInfo
=
(
SExchangeInfo
*
)
param
;
taosArrayDestroy
(
pExInfo
->
pSources
);
taosArrayDestroy
(
pExInfo
->
pSources
);
taosArrayDestroy
(
pExInfo
->
pSourceDataInfo
);
taosArrayDestroy
(
pExInfo
->
pSourceDataInfo
);
if
(
pExInfo
->
pResult
!=
NULL
)
{
if
(
pExInfo
->
pResult
!=
NULL
)
{
...
@@ -4322,6 +4285,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
...
@@ -4322,6 +4285,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
pInfo
->
p
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
pInfo
->
p
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
if
(
pInfo
->
pFillInfo
==
NULL
||
pInfo
->
p
==
NULL
)
{
if
(
pInfo
->
pFillInfo
==
NULL
||
pInfo
->
p
==
NULL
)
{
taosMemoryFree
(
pInfo
->
pFillInfo
);
taosMemoryFree
(
pInfo
->
p
);
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
}
else
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -4846,11 +4811,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
...
@@ -4846,11 +4811,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
pOptr
=
createMergeIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
pTaskInfo
);
pOptr
=
createMergeIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
==
type
)
{
qDebug
(
"[******]create Semi"
);
int32_t
children
=
0
;
int32_t
children
=
0
;
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
==
type
)
{
qDebug
(
"[******]create Final"
);
int32_t
children
=
1
;
int32_t
children
=
1
;
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SORT
==
type
)
{
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SORT
==
type
)
{
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
efd28728
...
@@ -332,12 +332,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
...
@@ -332,12 +332,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
// if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
// } else {
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo,
// pInfo->binfo.rowCellInfoOffset);
// }
#if 0
#if 0
if(pOperator->fpSet.encodeResultRow){
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
char *result = NULL;
...
@@ -378,8 +373,9 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
...
@@ -378,8 +373,9 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return
(
rows
==
0
)
?
NULL
:
pRes
;
return
(
rows
==
0
)
?
NULL
:
pRes
;
}
}
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SNode
*
pCondition
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
)
{
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SNode
*
pCondition
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
)
{
SGroupbyOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
SGroupbyOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
@@ -407,7 +403,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
...
@@ -407,7 +403,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
// pOperator->operatorType = OP_Groupby;
// pOperator->operatorType = OP_Groupby;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
...
@@ -669,8 +665,8 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
...
@@ -669,8 +665,8 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree
(
pInfo
->
columnOffset
);
taosMemoryFree
(
pInfo
->
columnOffset
);
}
}
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SExecTaskInfo
*
pTaskInfo
)
{
S
SDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
S
ExecTaskInfo
*
pTaskInfo
)
{
SPartitionOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SPartitionOperatorInfo
));
SPartitionOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SPartitionOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
efd28728
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
#include "functionMgt.h"
#include "functionMgt.h"
#include "tdatablock.h"
#include "tdatablock.h"
#include "ttime.h"
#include "ttime.h"
#include "tfill.h"
typedef
enum
SResultTsInterpType
{
typedef
enum
SResultTsInterpType
{
RESULT_ROW_START_INTERP
=
1
,
RESULT_ROW_START_INTERP
=
1
,
...
@@ -30,18 +31,18 @@ static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo
...
@@ -30,18 +31,18 @@ static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo
static
SResultRowPosition
addToOpenWindowList
(
SResultRowInfo
*
pResultRowInfo
,
const
SResultRow
*
pResult
);
static
SResultRowPosition
addToOpenWindowList
(
SResultRowInfo
*
pResultRowInfo
,
const
SResultRow
*
pResult
);
static
void
doCloseWindow
(
SResultRowInfo
*
pResultRowInfo
,
const
SIntervalAggOperatorInfo
*
pInfo
,
SResultRow
*
pResult
);
static
void
doCloseWindow
(
SResultRowInfo
*
pResultRowInfo
,
const
SIntervalAggOperatorInfo
*
pInfo
,
SResultRow
*
pResult
);
/*
/
//
*
* There are two cases to handle:
//
* There are two cases to handle:
*
//
*
* 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including
//
* 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including
* pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey.
//
* pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey.
* 2. Query range is set and query is in progress. There may be another result with the same query ranges to be
//
* 2. Query range is set and query is in progress. There may be another result with the same query ranges to be
* merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
//
* merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
* is a previous result generated or not.
//
* is a previous result generated or not.
*/
//
*/
static
void
setIntervalQueryRange
(
STableQueryInfo
*
pTableQueryInfo
,
TSKEY
key
,
STimeWindow
*
pQRange
)
{
//
static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
// do nothing
//
// do nothing
}
//
}
static
TSKEY
getStartTsKey
(
STimeWindow
*
win
,
const
TSKEY
*
tsCols
)
{
return
tsCols
==
NULL
?
win
->
skey
:
tsCols
[
0
];
}
static
TSKEY
getStartTsKey
(
STimeWindow
*
win
,
const
TSKEY
*
tsCols
)
{
return
tsCols
==
NULL
?
win
->
skey
:
tsCols
[
0
];
}
...
@@ -327,8 +328,7 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
...
@@ -327,8 +328,7 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
int32_t
index
=
1
;
int32_t
index
=
1
;
for
(
int32_t
k
=
0
;
k
<
numOfExprs
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
numOfExprs
;
++
k
)
{
// todo use flag instead of function name
if
(
!
fmIsIntervalInterpoFunc
(
pCtx
[
k
].
functionId
))
{
if
(
strcmp
(
pCtx
[
k
].
pExpr
->
pExpr
->
_function
.
functionName
,
"twa"
)
!=
0
)
{
pCtx
[
k
].
start
.
key
=
INT64_MIN
;
pCtx
[
k
].
start
.
key
=
INT64_MIN
;
continue
;
continue
;
}
}
...
@@ -341,7 +341,7 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
...
@@ -341,7 +341,7 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
SFunctParam
*
pParam
=
&
pCtx
[
k
].
param
[
0
];
SFunctParam
*
pParam
=
&
pCtx
[
k
].
param
[
0
];
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
,
pParam
->
pCol
->
slotId
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
,
pParam
->
pCol
->
slotId
);
ASSERT
(
pColInfo
->
info
.
colId
==
pParam
->
pCol
->
colId
&&
curTs
!=
windowKey
);
ASSERT
(
pColInfo
->
info
.
type
==
pParam
->
pCol
->
type
&&
curTs
!=
windowKey
);
double
v1
=
0
,
v2
=
0
,
v
=
0
;
double
v1
=
0
,
v2
=
0
,
v
=
0
;
if
(
prevRowIndex
==
-
1
)
{
if
(
prevRowIndex
==
-
1
)
{
...
@@ -958,9 +958,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -958,9 +958,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
scanFlag
,
true
);
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
scanFlag
,
true
);
STableQueryInfo
*
pTableQueryInfo
=
pInfo
->
pCurrent
;
setIntervalQueryRange
(
pTableQueryInfo
,
pBlock
->
info
.
window
.
skey
,
&
pTaskInfo
->
window
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
#if 0 // test for encode/decode result info
#if 0 // test for encode/decode result info
...
@@ -1414,7 +1411,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
...
@@ -1414,7 +1411,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SExprInfo
*
pExpr
=
pCtx
[
i
].
pExpr
;
SExprInfo
*
pExpr
=
pCtx
[
i
].
pExpr
;
if
(
strcmp
(
pExpr
->
pExpr
->
_function
.
functionName
,
"twa"
)
==
0
)
{
if
(
fmIsIntervalInterpoFunc
(
pCtx
[
i
].
functionId
)
)
{
SFunctParam
*
pParam
=
&
pExpr
->
base
.
pParam
[
0
];
SFunctParam
*
pParam
=
&
pExpr
->
base
.
pParam
[
0
];
SColumn
c
=
*
pParam
->
pCol
;
SColumn
c
=
*
pParam
->
pCol
;
...
@@ -1475,11 +1472,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
...
@@ -1475,11 +1472,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo
->
timeWindowInterpo
=
timeWindowinterpNeeded
(
pInfo
->
binfo
.
pCtx
,
numOfCols
,
pInfo
);
pInfo
->
timeWindowInterpo
=
timeWindowinterpNeeded
(
pInfo
->
binfo
.
pCtx
,
numOfCols
,
pInfo
);
if
(
pInfo
->
timeWindowInterpo
)
{
if
(
pInfo
->
timeWindowInterpo
)
{
pInfo
->
binfo
.
resultRowInfo
.
openWindow
=
tdListNew
(
sizeof
(
SResultRowPosition
));
pInfo
->
binfo
.
resultRowInfo
.
openWindow
=
tdListNew
(
sizeof
(
SResultRowPosition
));
}
if
(
pInfo
->
binfo
.
resultRowInfo
.
openWindow
==
NULL
)
{
goto
_error
;
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
}
if
(
code
!=
TSDB_CODE_SUCCESS
/* || pInfo->pTableQueryInfo == NULL*/
)
{
goto
_error
;
}
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
1
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
1
);
...
@@ -1694,24 +1689,44 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
...
@@ -1694,24 +1689,44 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
return
(
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
return
(
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
}
static
SSDataBlock
*
doAllIntervalAgg
(
SOperatorInfo
*
pOperator
)
{
static
void
doKeepPrevRows
(
STimeSliceOperatorInfo
*
pSliceInfo
,
const
SSDataBlock
*
pBlock
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
// null data should not be kept since it can not be used to perform interpolation
if
(
!
colDataIsNull_s
(
pColInfoData
,
i
))
{
SGroupKeys
*
pkey
=
taosArrayGet
(
pSliceInfo
->
pPrevRow
,
i
);
pkey
->
isNull
=
false
;
char
*
val
=
colDataGetData
(
pColInfoData
,
i
);
memcpy
(
pkey
->
pData
,
val
,
pkey
->
bytes
);
}
}
}
static
SSDataBlock
*
doTimeslice
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
return
NULL
;
}
}
STimeSliceOperatorInfo
*
pSliceInfo
=
pOperator
->
info
;
STimeSliceOperatorInfo
*
pSliceInfo
=
pOperator
->
info
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
SSDataBlock
*
pResBlock
=
pSliceInfo
->
binfo
.
pRes
;
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if
(
pSliceInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hashRemainDataInGroupInfo
(
&
pSliceInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
return
pSliceInfo
->
binfo
.
pRes
;
// if (pOperator->status == OP_RES_TO_RETURN) {
}
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
// doSetOperatorCompleted(pOperator);
// }
//
// return pResBlock;
// }
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
order
=
TSDB_ORDER_ASC
;
SInterval
*
pInterval
=
&
pSliceInfo
->
interval
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int32_t
numOfRows
=
0
;
while
(
1
)
{
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
if
(
pBlock
==
NULL
)
{
...
@@ -1720,48 +1735,198 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -1720,48 +1735,198 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pSliceInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
setInputDataBlock
(
pOperator
,
pSliceInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
SColumnInfoData
*
pTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
ts
=
*
(
int64_t
*
)
colDataGetData
(
pTsCol
,
i
);
if
(
ts
==
pSliceInfo
->
current
)
{
for
(
int32_t
j
=
0
;
j
<
pOperator
->
numOfExprs
;
++
j
)
{
SExprInfo
*
pExprInfo
=
&
pOperator
->
pExpr
[
j
];
int32_t
dstSlot
=
pExprInfo
->
base
.
resSchema
.
slotId
;
int32_t
srcSlot
=
pExprInfo
->
base
.
pParam
[
0
].
pCol
->
slotId
;
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pBlock
->
pDataBlock
,
srcSlot
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pBlock
->
pDataBlock
,
dstSlot
);
char
*
v
=
colDataGetData
(
pSrc
,
i
);
colDataAppend
(
pDst
,
numOfRows
,
v
,
false
);
}
numOfRows
+=
1
;
pSliceInfo
->
current
+=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
}
else
if
(
ts
<
pSliceInfo
->
current
)
{
if
(
i
!=
pBlock
->
info
.
window
.
ekey
)
{
int64_t
nextTs
=
*
(
int64_t
*
)
colDataGetData
(
pTsCol
,
i
+
1
);
if
(
nextTs
>
pSliceInfo
->
current
)
{
// output the result
for
(
int32_t
j
=
0
;
j
<
pOperator
->
numOfExprs
;
++
j
)
{
SExprInfo
*
pExprInfo
=
&
pOperator
->
pExpr
[
j
];
int32_t
dstSlot
=
pExprInfo
->
base
.
resSchema
.
slotId
;
int32_t
srcSlot
=
pExprInfo
->
base
.
pParam
[
0
].
pCol
->
slotId
;
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pBlock
->
pDataBlock
,
srcSlot
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pBlock
->
pDataBlock
,
dstSlot
);
switch
(
pSliceInfo
->
fillType
)
{
case
TSDB_FILL_NULL
:
colDataAppendNULL
(
pDst
,
numOfRows
);
break
;
case
TSDB_FILL_SET_VALUE
:
{
SVariant
*
pVar
=
&
pSliceInfo
->
pFillColInfo
[
i
].
fillVal
;
if
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_FLOAT
)
{
float
v
=
0
;
GET_TYPED_DATA
(
v
,
float
,
pVar
->
nType
,
&
pVar
->
i
);
colDataAppend
(
pDst
,
numOfRows
,
(
char
*
)
&
v
,
false
);
}
else
if
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pVar
->
nType
,
&
pVar
->
i
);
colDataAppend
(
pDst
,
numOfRows
,
(
char
*
)
&
v
,
false
);
}
else
if
(
IS_SIGNED_NUMERIC_TYPE
(
pDst
->
info
.
type
))
{
int64_t
v
=
0
;
GET_TYPED_DATA
(
v
,
int64_t
,
pVar
->
nType
,
&
pVar
->
i
);
colDataAppend
(
pDst
,
numOfRows
,
(
char
*
)
&
v
,
false
);
}
}
break
;
case
TSDB_FILL_LINEAR
:
#if 0
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
// goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = ts, .val = &v1};
SPoint point2 = {.key = nextTs, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
#endif
break
;
case
TSDB_FILL_PREV
:
{
SGroupKeys
*
pkey
=
taosArrayGet
(
pSliceInfo
->
pPrevRow
,
srcSlot
);
colDataAppend
(
pDst
,
numOfRows
,
pkey
->
pData
,
false
);
}
break
;
case
TSDB_FILL_NEXT
:
{
}
break
;
case
TSDB_FILL_NONE
:
default:
break
;
}
pSliceInfo
->
current
+=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
}
}
else
{
// ignore current row, and do nothing
}
}
else
{
// it is the last row of current block
doKeepPrevRows
(
pSliceInfo
,
pBlock
);
}
}
}
}
}
// restore the value
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pSliceInfo
->
binfo
.
resultRowInfo
);
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs);
if
(
pResBlock
->
info
.
rows
==
0
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo)
;
return
pResBlock
->
info
.
rows
==
0
?
NULL
:
pResBlock
;
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
}
if
(
pSliceInfo
->
binfo
.
pRes
->
info
.
rows
==
0
||
!
hashRemainDataInGroupInfo
(
&
pSliceInfo
->
groupResInfo
))
{
static
int32_t
initTimesliceInfo
(
STimeSliceOperatorInfo
*
pInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfCols
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
pInfo
->
pPrevRow
=
taosArrayInit
(
4
,
sizeof
(
SGroupKeys
));
pInfo
->
pCols
=
taosArrayInit
(
4
,
sizeof
(
SColumn
));
if
(
pInfo
->
pPrevRow
==
NULL
||
pInfo
->
pCols
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SExprInfo
*
pExpr
=
pCtx
[
i
].
pExpr
;
SFunctParam
*
pParam
=
&
pExpr
->
base
.
pParam
[
0
];
SColumn
c
=
*
pParam
->
pCol
;
taosArrayPush
(
pInfo
->
pCols
,
&
c
);
SGroupKeys
key
=
{
0
};
key
.
bytes
=
c
.
bytes
;
key
.
type
=
c
.
type
;
key
.
isNull
=
false
;
key
.
pData
=
taosMemoryCalloc
(
1
,
c
.
bytes
);
taosArrayPush
(
pInfo
->
pPrevRow
,
&
key
);
}
}
return
pSliceInfo
->
binfo
.
pRes
->
info
.
rows
==
0
?
NULL
:
pSliceInfo
->
binfo
.
pRes
;
return
TSDB_CODE_SUCCESS
;
}
}
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SSDataBlock
*
pResultBlock
,
const
SNodeListNode
*
pValNode
,
SExecTaskInfo
*
pTaskInfo
)
{
STimeSliceOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STimeSliceOperatorInfo
));
STimeSliceOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STimeSliceOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
||
pInfo
==
NULL
)
{
if
(
pOperator
==
NULL
||
pInfo
==
NULL
)
{
goto
_error
;
goto
_error
;
}
}
int32_t
code
=
initTimesliceInfo
(
pInfo
,
pInfo
->
binfo
.
pCtx
,
numOfCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
pInfo
->
pFillColInfo
=
createFillColInfo
(
pExprInfo
,
numOfCols
,
pValNode
);
pInfo
->
binfo
.
pRes
=
pResultBlock
;
pOperator
->
name
=
"TimeSliceOperator"
;
pOperator
->
name
=
"TimeSliceOperator"
;
// pOperator->operatorType = OP_AllTimeWindow;
// pOperator->operatorType = OP_AllTimeWindow;
pOperator
->
blocking
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
do
AllIntervalAgg
,
NULL
,
NULL
,
destroyBasicOperatorInfo
,
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
do
Timeslice
,
NULL
,
NULL
,
destroyBasicOperatorInfo
,
NULL
,
NULL
,
NULL
);
NULL
,
NULL
,
NULL
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
return
pOperator
;
_error:
_error:
...
@@ -3344,9 +3509,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -3344,9 +3509,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
getTableScanInfo
(
pOperator
,
&
iaInfo
->
order
,
&
scanFlag
);
getTableScanInfo
(
pOperator
,
&
iaInfo
->
order
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
iaInfo
->
binfo
.
pCtx
,
pBlock
,
iaInfo
->
order
,
scanFlag
,
true
);
setInputDataBlock
(
pOperator
,
iaInfo
->
binfo
.
pCtx
,
pBlock
,
iaInfo
->
order
,
scanFlag
,
true
);
STableQueryInfo
*
pTableQueryInfo
=
iaInfo
->
pCurrent
;
setIntervalQueryRange
(
pTableQueryInfo
,
pBlock
->
info
.
window
.
skey
,
&
pTaskInfo
->
window
);
doMergeIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
doMergeIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
efd28728
...
@@ -97,6 +97,10 @@ bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
...
@@ -97,6 +97,10 @@ bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool
diffFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
bool
diffFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
diffFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
diffFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getDerivativeFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
derivativeFuncSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
);
int32_t
derivativeFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getFirstLastFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
getFirstLastFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
firstFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
firstFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
firstFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
int32_t
firstFunctionMerge
(
SqlFunctionCtx
*
pCtx
);
...
...
source/libs/function/inc/functionMgtInt.h
浏览文件 @
efd28728
...
@@ -42,7 +42,8 @@ extern "C" {
...
@@ -42,7 +42,8 @@ extern "C" {
#define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13)
#define FUNC_MGT_SELECT_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(13)
#define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14)
#define FUNC_MGT_REPEAT_SCAN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(14)
#define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15)
#define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15)
#define FUNC_MGT_FORBID_STREAM_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16)
#define FUNC_MGT_INTERVAL_INTERPO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16)
#define FUNC_MGT_FORBID_STREAM_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(17)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
...
...
source/libs/function/inc/texpr.h
浏览文件 @
efd28728
...
@@ -60,12 +60,6 @@ typedef struct SExprTraverseSupp {
...
@@ -60,12 +60,6 @@ typedef struct SExprTraverseSupp {
void
*
pExtInfo
;
void
*
pExtInfo
;
}
SExprTraverseSupp
;
}
SExprTraverseSupp
;
tExprNode
*
exprTreeFromTableName
(
const
char
*
tbnameCond
);
bool
exprTreeApplyFilter
(
tExprNode
*
pExpr
,
const
void
*
pItem
,
SExprTraverseSupp
*
param
);
void
buildFilterSetFromBinary
(
void
**
q
,
const
char
*
buf
,
int32_t
len
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/libs/function/inc/tunaryoperator.h
已删除
100644 → 0
浏览文件 @
39ac85b1
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_UNARY_SCALAR_OPERATOR_H_
#define _TD_COMMON_UNARY_SCALAR_OPERATOR_H_
#ifdef __cplusplus
extern
"C"
{
#endif
//#include "tscalarfunction.h"
//typedef void (*_unary_scalar_fn_t)(SScalarParam *pLeft, SScalarParam* pOutput);
//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t binOperator);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_COMMON_BIN_SCALAR_OPERATOR_H_*/
source/libs/function/src/builtins.c
浏览文件 @
efd28728
...
@@ -997,6 +997,38 @@ static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len
...
@@ -997,6 +997,38 @@ static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
translateDerivative
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
if
(
3
!=
LIST_LENGTH
(
pFunc
->
pParameterList
))
{
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
uint8_t
colType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
.
type
;
// param1
SNode
*
pParamNode1
=
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
if
(
QUERY_NODE_VALUE
!=
nodeType
(
pParamNode1
))
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
SValueNode
*
pValue
=
(
SValueNode
*
)
pParamNode1
;
pValue
->
notReserved
=
true
;
if
(
!
IS_NUMERIC_TYPE
(
colType
))
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
SNode
*
pParamNode2
=
nodesListGetNode
(
pFunc
->
pParameterList
,
2
);
SValueNode
*
pValue2
=
(
SValueNode
*
)
pParamNode2
;
pValue2
->
notReserved
=
true
;
if
(
pValue2
->
datum
.
i
!=
0
&&
pValue2
->
datum
.
i
!=
1
)
{
return
invaildFuncParaValueErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_DOUBLE
].
bytes
,
.
type
=
TSDB_DATA_TYPE_DOUBLE
};
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateFirstLast
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
static
int32_t
translateFirstLast
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// first(col_list) will be rewritten as first(col)
// first(col_list) will be rewritten as first(col)
if
(
1
!=
LIST_LENGTH
(
pFunc
->
pParameterList
))
{
if
(
1
!=
LIST_LENGTH
(
pFunc
->
pParameterList
))
{
...
@@ -1596,6 +1628,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
...
@@ -1596,6 +1628,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
type
=
FUNCTION_TYPE_AVG
,
.
type
=
FUNCTION_TYPE_AVG
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
translateFunc
=
translateInNumOutDou
,
.
translateFunc
=
translateInNumOutDou
,
.
dataRequiredFunc
=
statisDataRequired
,
.
getEnvFunc
=
getAvgFuncEnv
,
.
getEnvFunc
=
getAvgFuncEnv
,
.
initFunc
=
avgFunctionSetup
,
.
initFunc
=
avgFunctionSetup
,
.
processFunc
=
avgFunction
,
.
processFunc
=
avgFunction
,
...
@@ -1793,7 +1826,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
...
@@ -1793,7 +1826,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
{
.
name
=
"elapsed"
,
.
name
=
"elapsed"
,
.
type
=
FUNCTION_TYPE_ELAPSED
,
.
type
=
FUNCTION_TYPE_ELAPSED
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_FORBID_STREAM_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_
TIMELINE_FUNC
|
FUNC_MGT_INTERVAL_INTERPO_FUNC
|
FUNC_MGT_
FORBID_STREAM_FUNC
,
.
dataRequiredFunc
=
statisDataRequired
,
.
dataRequiredFunc
=
statisDataRequired
,
.
translateFunc
=
translateElapsed
,
.
translateFunc
=
translateElapsed
,
.
getEnvFunc
=
getElapsedFuncEnv
,
.
getEnvFunc
=
getElapsedFuncEnv
,
...
@@ -1831,6 +1864,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
...
@@ -1831,6 +1864,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
invertFunc
=
NULL
,
.
invertFunc
=
NULL
,
.
combineFunc
=
elapsedCombine
,
.
combineFunc
=
elapsedCombine
,
},
},
{
.
name
=
"interp"
,
.
type
=
FUNCTION_TYPE_INTERP
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_TIMELINE_FUNC
|
FUNC_MGT_INTERVAL_INTERPO_FUNC
,
.
translateFunc
=
translateFirstLast
,
.
getEnvFunc
=
getSelectivityFuncEnv
,
.
initFunc
=
functionSetup
,
.
processFunc
=
NULL
,
.
finalizeFunc
=
NULL
},
{
.
name
=
"derivative"
,
.
type
=
FUNCTION_TYPE_DERIVATIVE
,
.
classification
=
FUNC_MGT_INDEFINITE_ROWS_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateDerivative
,
.
getEnvFunc
=
getDerivativeFuncEnv
,
.
initFunc
=
derivativeFuncSetup
,
.
processFunc
=
derivativeFunction
,
.
finalizeFunc
=
functionFinalize
},
{
{
.
name
=
"last_row"
,
.
name
=
"last_row"
,
.
type
=
FUNCTION_TYPE_LAST_ROW
,
.
type
=
FUNCTION_TYPE_LAST_ROW
,
...
@@ -1914,8 +1967,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
...
@@ -1914,8 +1967,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
{
.
name
=
"twa"
,
.
name
=
"twa"
,
.
type
=
FUNCTION_TYPE_TWA
,
.
type
=
FUNCTION_TYPE_TWA
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_TIMELINE_FUNC
|
FUNC_MGT_FORBID_STREAM_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_TIMELINE_FUNC
|
FUNC_MGT_
INTERVAL_INTERPO_FUNC
|
FUNC_MGT_
FORBID_STREAM_FUNC
,
.
translateFunc
=
translateInNumOutDou
,
.
translateFunc
=
translateInNumOutDou
,
.
dataRequiredFunc
=
statisDataRequired
,
.
getEnvFunc
=
getTwaFuncEnv
,
.
getEnvFunc
=
getTwaFuncEnv
,
.
initFunc
=
twaFunctionSetup
,
.
initFunc
=
twaFunctionSetup
,
.
processFunc
=
twaFunction
,
.
processFunc
=
twaFunction
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
efd28728
...
@@ -990,9 +990,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
...
@@ -990,9 +990,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
index
=
pInput
->
pColumnDataAgg
[
0
]
->
maxIndex
;
index
=
pInput
->
pColumnDataAgg
[
0
]
->
maxIndex
;
}
}
// the index is the original position, not the relative position
TSKEY
key
=
(
pCtx
->
ptsList
!=
NULL
)
?
pCtx
->
ptsList
[
index
]
:
TSKEY_INITIAL_VAL
;
if
(
!
pBuf
->
assign
)
{
if
(
!
pBuf
->
assign
)
{
pBuf
->
v
=
*
(
int64_t
*
)
tval
;
pBuf
->
v
=
*
(
int64_t
*
)
tval
;
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
...
@@ -3424,7 +3421,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo
...
@@ -3424,7 +3421,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo
pInfo
->
min
=
MAX_TS_KEY
;
pInfo
->
min
=
MAX_TS_KEY
;
pInfo
->
max
=
0
;
pInfo
->
max
=
0
;
if
(
pCtx
->
numOfParams
==
2
)
{
if
(
pCtx
->
numOfParams
>
2
)
{
pInfo
->
timeUnit
=
pCtx
->
param
[
1
].
param
.
i
;
pInfo
->
timeUnit
=
pCtx
->
param
[
1
].
param
.
i
;
}
else
{
}
else
{
pInfo
->
timeUnit
=
1
;
pInfo
->
timeUnit
=
1
;
...
@@ -3474,8 +3471,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
...
@@ -3474,8 +3471,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
int32_t
start
=
pInput
->
startRowIndex
;
int32_t
start
=
pInput
->
startRowIndex
;
TSKEY
*
ptsList
=
(
int64_t
*
)
colDataGetData
(
pCol
,
start
);
TSKEY
*
ptsList
=
(
int64_t
*
)
colDataGetData
(
pCol
,
0
);
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
)
{
if
(
pCtx
->
order
==
TSDB_ORDER_DESC
)
{
if
(
pCtx
->
start
.
key
==
INT64_MIN
)
{
if
(
pCtx
->
start
.
key
==
INT64_MIN
)
{
pInfo
->
max
=
pInfo
->
max
=
...
@@ -5166,3 +5163,198 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
...
@@ -5166,3 +5163,198 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return
row
;
return
row
;
}
}
typedef
struct
SDerivInfo
{
double
prevValue
;
// previous value
TSKEY
prevTs
;
// previous timestamp
bool
ignoreNegative
;
// ignore the negative value
int64_t
tsWindow
;
// time window for derivative
bool
valueSet
;
// the value has been set already
}
SDerivInfo
;
bool
getDerivativeFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SDerivInfo
);
return
true
;
}
bool
derivativeFuncSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResInfo
))
{
return
false
;
// not initialized since it has been initialized
}
SDerivInfo
*
pDerivInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pDerivInfo
->
ignoreNegative
=
pCtx
->
param
[
2
].
param
.
i
;
pDerivInfo
->
prevTs
=
-
1
;
pDerivInfo
->
tsWindow
=
pCtx
->
param
[
1
].
param
.
i
;
pDerivInfo
->
valueSet
=
false
;
return
true
;
}
int32_t
derivativeFunction
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SDerivInfo
*
pDerivInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
int32_t
numOfElems
=
0
;
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
SColumnInfoData
*
pTsOutput
=
pCtx
->
pTsOutput
;
int32_t
i
=
pInput
->
startRowIndex
;
TSKEY
*
tsList
=
(
int64_t
*
)
pInput
->
pPTS
->
pData
;
double
v
=
0
;
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
for
(;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
1
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
continue
;
}
char
*
d
=
(
char
*
)
pInputCol
->
pData
+
pInputCol
->
info
.
bytes
*
i
;
GET_TYPED_DATA
(
v
,
double
,
pInputCol
->
info
.
type
,
d
);
int32_t
pos
=
pCtx
->
offset
+
numOfElems
;
if
(
!
pDerivInfo
->
valueSet
)
{
// initial value is not set yet
pDerivInfo
->
valueSet
=
true
;
}
else
{
double
r
=
((
v
-
pDerivInfo
->
prevValue
)
*
pDerivInfo
->
tsWindow
)
/
(
tsList
[
i
]
-
pDerivInfo
->
prevTs
);
if
(
pDerivInfo
->
ignoreNegative
&&
r
<
0
)
{
}
else
{
colDataAppend
(
pOutput
,
pos
,
(
const
char
*
)
&
r
,
false
);
if
(
pTsOutput
!=
NULL
)
{
colDataAppendInt64
(
pTsOutput
,
pos
,
&
tsList
[
i
]);
}
numOfElems
++
;
}
}
pDerivInfo
->
prevValue
=
v
;
pDerivInfo
->
prevTs
=
tsList
[
i
];
}
}
else
{
for
(;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
1
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
continue
;
}
char
*
d
=
(
char
*
)
pInputCol
->
pData
+
pInputCol
->
info
.
bytes
*
i
;
GET_TYPED_DATA
(
v
,
double
,
pInputCol
->
info
.
type
,
d
);
int32_t
pos
=
pCtx
->
offset
+
numOfElems
;
if
(
!
pDerivInfo
->
valueSet
)
{
// initial value is not set yet
pDerivInfo
->
valueSet
=
true
;
}
else
{
double
r
=
((
pDerivInfo
->
prevValue
-
v
)
*
pDerivInfo
->
tsWindow
)
/
(
pDerivInfo
->
prevTs
-
tsList
[
i
]);
if
(
pDerivInfo
->
ignoreNegative
&&
r
<
0
)
{
}
else
{
colDataAppend
(
pOutput
,
pos
,
(
const
char
*
)
&
r
,
false
);
if
(
pTsOutput
!=
NULL
)
{
colDataAppendInt64
(
pTsOutput
,
pos
,
&
pDerivInfo
->
prevTs
);
}
numOfElems
++
;
}
}
pDerivInfo
->
prevValue
=
v
;
pDerivInfo
->
prevTs
=
tsList
[
i
];
}
}
return
numOfElems
;
}
int32_t
interpFunction
(
SqlFunctionCtx
*
pCtx
)
{
#if 0
int32_t fillType = (int32_t) pCtx->param[2].i64;
//bool ascQuery = (pCtx->order == TSDB_ORDER_ASC);
if (pCtx->start.key == pCtx->startTs) {
assert(pCtx->start.key != INT64_MIN);
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
goto interp_success_exit;
} else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
goto interp_success_exit;
}
switch (fillType) {
case TSDB_FILL_NULL:
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
break;
case TSDB_FILL_SET_VALUE:
tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
break;
case TSDB_FILL_LINEAR:
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
goto interp_exit;
}
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
SPoint point1 = {.key = pCtx->start.key, .val = &v1};
SPoint point2 = {.key = pCtx->end.key, .val = &v2};
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
int32_t srcType = pCtx->inputType;
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
} else {
bool exceedMax = false, exceedMin = false;
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
if (exceedMax || exceedMin) {
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
} else {
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
}
}
}
break;
case TSDB_FILL_PREV:
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) {
goto interp_exit;
}
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
break;
case TSDB_FILL_NEXT:
if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
goto interp_exit;
}
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
break;
case TSDB_FILL_NONE:
// do nothing
default:
goto interp_exit;
}
interp_success_exit:
*(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs;
INC_INIT_VAL(pCtx, 1);
interp_exit:
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->endTs = pCtx->startTs;
#endif
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/libs/function/src/functionMgt.c
浏览文件 @
efd28728
...
@@ -161,6 +161,8 @@ bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; }
...
@@ -161,6 +161,8 @@ bool fmIsUserDefinedFunc(int32_t funcId) { return funcId > FUNC_UDF_ID_START; }
bool
fmIsForbidFillFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_FORBID_FILL_FUNC
);
}
bool
fmIsForbidFillFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_FORBID_FILL_FUNC
);
}
bool
fmIsIntervalInterpoFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_INTERVAL_INTERPO_FUNC
);
}
bool
fmIsForbidStreamFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_FORBID_STREAM_FUNC
);
}
bool
fmIsForbidStreamFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_FORBID_STREAM_FUNC
);
}
void
fmFuncMgtDestroy
()
{
void
fmFuncMgtDestroy
()
{
...
...
source/libs/function/src/taggfunction.c
浏览文件 @
efd28728
...
@@ -161,13 +161,13 @@ typedef struct SRateInfo {
...
@@ -161,13 +161,13 @@ typedef struct SRateInfo {
bool
isIRate
;
// true for IRate functions, false for Rate functions
bool
isIRate
;
// true for IRate functions, false for Rate functions
}
SRateInfo
;
}
SRateInfo
;
typedef
struct
SDerivInfo
{
//
typedef struct SDerivInfo {
double
prevValue
;
// previous value
//
double prevValue; // previous value
TSKEY
prevTs
;
// previous timestamp
//
TSKEY prevTs; // previous timestamp
bool
ignoreNegative
;
// ignore the negative value
//
bool ignoreNegative;// ignore the negative value
int64_t
tsWindow
;
// time window for derivative
//
int64_t tsWindow; // time window for derivative
bool
valueSet
;
// the value has been set already
//
bool valueSet; // the value has been set already
}
SDerivInfo
;
//
} SDerivInfo;
typedef
struct
SResPair
{
typedef
struct
SResPair
{
TSKEY
key
;
TSKEY
key
;
...
...
source/libs/function/src/texpr.c
浏览文件 @
efd28728
...
@@ -48,34 +48,6 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
...
@@ -48,34 +48,6 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
*
pExpr
=
NULL
;
*
pExpr
=
NULL
;
}
}
bool
exprTreeApplyFilter
(
tExprNode
*
pExpr
,
const
void
*
pItem
,
SExprTraverseSupp
*
param
)
{
#if 0
//non-leaf nodes, recursively traverse the expression tree in the post-root order
if (pLeft->nodeType == TEXPR_BINARYEXPR_NODE && pRight->nodeType == TEXPR_BINARYEXPR_NODE) {
if (pExpr->_node.optr == LOGIC_COND_TYPE_OR) { // or
if (exprTreeApplyFilter(pLeft, pItem, param)) {
return true;
}
// left child does not satisfy the query condition, try right child
return exprTreeApplyFilter(pRight, pItem, param);
} else { // and
if (!exprTreeApplyFilter(pLeft, pItem, param)) {
return false;
}
return exprTreeApplyFilter(pRight, pItem, param);
}
}
// handle the leaf node
param->setupInfoFn(pExpr, param->pExtInfo);
return param->nodeFilterFn(pItem, pExpr->_node.info);
#endif
return
0
;
}
// TODO: these three functions should be made global
// TODO: these three functions should be made global
static
void
*
exception_calloc
(
size_t
nmemb
,
size_t
size
)
{
static
void
*
exception_calloc
(
size_t
nmemb
,
size_t
size
)
{
void
*
p
=
taosMemoryCalloc
(
nmemb
,
size
);
void
*
p
=
taosMemoryCalloc
(
nmemb
,
size
);
...
@@ -101,214 +73,3 @@ static UNUSED_FUNC char* exception_strdup(const char* str) {
...
@@ -101,214 +73,3 @@ static UNUSED_FUNC char* exception_strdup(const char* str) {
return
p
;
return
p
;
}
}
void
buildFilterSetFromBinary
(
void
**
q
,
const
char
*
buf
,
int32_t
len
)
{
SBufferReader
br
=
tbufInitReader
(
buf
,
len
,
false
);
uint32_t
type
=
tbufReadUint32
(
&
br
);
SHashObj
*
pObj
=
taosHashInit
(
256
,
taosGetDefaultHashFunction
(
type
),
true
,
false
);
// taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type));
int
dummy
=
-
1
;
int32_t
sz
=
tbufReadInt32
(
&
br
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
if
(
type
==
TSDB_DATA_TYPE_BOOL
||
IS_SIGNED_NUMERIC_TYPE
(
type
))
{
int64_t
val
=
tbufReadInt64
(
&
br
);
taosHashPut
(
pObj
,
(
char
*
)
&
val
,
sizeof
(
val
),
&
dummy
,
sizeof
(
dummy
));
}
else
if
(
IS_UNSIGNED_NUMERIC_TYPE
(
type
))
{
uint64_t
val
=
tbufReadUint64
(
&
br
);
taosHashPut
(
pObj
,
(
char
*
)
&
val
,
sizeof
(
val
),
&
dummy
,
sizeof
(
dummy
));
}
else
if
(
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
int64_t
val
=
tbufReadInt64
(
&
br
);
taosHashPut
(
pObj
,
(
char
*
)
&
val
,
sizeof
(
val
),
&
dummy
,
sizeof
(
dummy
));
}
else
if
(
type
==
TSDB_DATA_TYPE_DOUBLE
||
type
==
TSDB_DATA_TYPE_FLOAT
)
{
double
val
=
tbufReadDouble
(
&
br
);
taosHashPut
(
pObj
,
(
char
*
)
&
val
,
sizeof
(
val
),
&
dummy
,
sizeof
(
dummy
));
}
else
if
(
type
==
TSDB_DATA_TYPE_BINARY
)
{
size_t
t
=
0
;
const
char
*
val
=
tbufReadBinary
(
&
br
,
&
t
);
taosHashPut
(
pObj
,
(
char
*
)
val
,
t
,
&
dummy
,
sizeof
(
dummy
));
}
else
if
(
type
==
TSDB_DATA_TYPE_NCHAR
)
{
size_t
t
=
0
;
const
char
*
val
=
tbufReadBinary
(
&
br
,
&
t
);
taosHashPut
(
pObj
,
(
char
*
)
val
,
t
,
&
dummy
,
sizeof
(
dummy
));
}
}
*
q
=
(
void
*
)
pObj
;
}
void
convertFilterSetFromBinary
(
void
**
q
,
const
char
*
buf
,
int32_t
len
,
uint32_t
tType
)
{
SBufferReader
br
=
tbufInitReader
(
buf
,
len
,
false
);
uint32_t
sType
=
tbufReadUint32
(
&
br
);
SHashObj
*
pObj
=
taosHashInit
(
256
,
taosGetDefaultHashFunction
(
tType
),
true
,
false
);
// taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(tType));
int
dummy
=
-
1
;
SVariant
tmpVar
=
{
0
};
size_t
t
=
0
;
int32_t
sz
=
tbufReadInt32
(
&
br
);
void
*
pvar
=
NULL
;
int64_t
val
=
0
;
int32_t
bufLen
=
0
;
if
(
IS_NUMERIC_TYPE
(
sType
))
{
bufLen
=
60
;
// The maximum length of string that a number is converted to.
}
else
{
bufLen
=
128
;
}
char
*
tmp
=
taosMemoryCalloc
(
1
,
bufLen
*
TSDB_NCHAR_SIZE
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
switch
(
sType
)
{
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_UTINYINT
:
case
TSDB_DATA_TYPE_TINYINT
:
{
*
(
uint8_t
*
)
&
val
=
(
uint8_t
)
tbufReadInt64
(
&
br
);
t
=
sizeof
(
val
);
pvar
=
&
val
;
break
;
}
case
TSDB_DATA_TYPE_USMALLINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
{
*
(
uint16_t
*
)
&
val
=
(
uint16_t
)
tbufReadInt64
(
&
br
);
t
=
sizeof
(
val
);
pvar
=
&
val
;
break
;
}
case
TSDB_DATA_TYPE_UINT
:
case
TSDB_DATA_TYPE_INT
:
{
*
(
uint32_t
*
)
&
val
=
(
uint32_t
)
tbufReadInt64
(
&
br
);
t
=
sizeof
(
val
);
pvar
=
&
val
;
break
;
}
case
TSDB_DATA_TYPE_TIMESTAMP
:
case
TSDB_DATA_TYPE_UBIGINT
:
case
TSDB_DATA_TYPE_BIGINT
:
{
*
(
uint64_t
*
)
&
val
=
(
uint64_t
)
tbufReadInt64
(
&
br
);
t
=
sizeof
(
val
);
pvar
=
&
val
;
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
*
(
double
*
)
&
val
=
tbufReadDouble
(
&
br
);
t
=
sizeof
(
val
);
pvar
=
&
val
;
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
*
(
float
*
)
&
val
=
(
float
)
tbufReadDouble
(
&
br
);
t
=
sizeof
(
val
);
pvar
=
&
val
;
break
;
}
case
TSDB_DATA_TYPE_BINARY
:
{
pvar
=
(
char
*
)
tbufReadBinary
(
&
br
,
&
t
);
break
;
}
case
TSDB_DATA_TYPE_NCHAR
:
{
pvar
=
(
char
*
)
tbufReadBinary
(
&
br
,
&
t
);
break
;
}
default:
taosHashCleanup
(
pObj
);
*
q
=
NULL
;
return
;
}
taosVariantCreateFromBinary
(
&
tmpVar
,
(
char
*
)
pvar
,
t
,
sType
);
if
(
bufLen
<
t
)
{
tmp
=
taosMemoryRealloc
(
tmp
,
t
*
TSDB_NCHAR_SIZE
);
bufLen
=
(
int32_t
)
t
;
}
switch
(
tType
)
{
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_UTINYINT
:
case
TSDB_DATA_TYPE_TINYINT
:
{
if
(
taosVariantDump
(
&
tmpVar
,
(
char
*
)
&
val
,
tType
,
false
))
{
goto
err_ret
;
}
pvar
=
&
val
;
t
=
sizeof
(
val
);
break
;
}
case
TSDB_DATA_TYPE_USMALLINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
{
if
(
taosVariantDump
(
&
tmpVar
,
(
char
*
)
&
val
,
tType
,
false
))
{
goto
err_ret
;
}
pvar
=
&
val
;
t
=
sizeof
(
val
);
break
;
}
case
TSDB_DATA_TYPE_UINT
:
case
TSDB_DATA_TYPE_INT
:
{
if
(
taosVariantDump
(
&
tmpVar
,
(
char
*
)
&
val
,
tType
,
false
))
{
goto
err_ret
;
}
pvar
=
&
val
;
t
=
sizeof
(
val
);
break
;
}
case
TSDB_DATA_TYPE_TIMESTAMP
:
case
TSDB_DATA_TYPE_UBIGINT
:
case
TSDB_DATA_TYPE_BIGINT
:
{
if
(
taosVariantDump
(
&
tmpVar
,
(
char
*
)
&
val
,
tType
,
false
))
{
goto
err_ret
;
}
pvar
=
&
val
;
t
=
sizeof
(
val
);
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
if
(
taosVariantDump
(
&
tmpVar
,
(
char
*
)
&
val
,
tType
,
false
))
{
goto
err_ret
;
}
pvar
=
&
val
;
t
=
sizeof
(
val
);
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
if
(
taosVariantDump
(
&
tmpVar
,
(
char
*
)
&
val
,
tType
,
false
))
{
goto
err_ret
;
}
pvar
=
&
val
;
t
=
sizeof
(
val
);
break
;
}
case
TSDB_DATA_TYPE_BINARY
:
{
if
(
taosVariantDump
(
&
tmpVar
,
tmp
,
tType
,
true
))
{
goto
err_ret
;
}
t
=
varDataLen
(
tmp
);
pvar
=
varDataVal
(
tmp
);
break
;
}
case
TSDB_DATA_TYPE_NCHAR
:
{
if
(
taosVariantDump
(
&
tmpVar
,
tmp
,
tType
,
true
))
{
goto
err_ret
;
}
t
=
varDataLen
(
tmp
);
pvar
=
varDataVal
(
tmp
);
break
;
}
default:
goto
err_ret
;
}
taosHashPut
(
pObj
,
(
char
*
)
pvar
,
t
,
&
dummy
,
sizeof
(
dummy
));
taosVariantDestroy
(
&
tmpVar
);
memset
(
&
tmpVar
,
0
,
sizeof
(
tmpVar
));
}
*
q
=
(
void
*
)
pObj
;
pObj
=
NULL
;
err_ret:
taosVariantDestroy
(
&
tmpVar
);
taosHashCleanup
(
pObj
);
taosMemoryFreeClear
(
tmp
);
}
source/util/src/tcache.c
浏览文件 @
efd28728
...
@@ -53,19 +53,19 @@ typedef struct SCacheEntry {
...
@@ -53,19 +53,19 @@ typedef struct SCacheEntry {
SCacheNode
*
next
;
SCacheNode
*
next
;
}
SCacheEntry
;
}
SCacheEntry
;
typedef
struct
STrashElem
{
struct
STrashElem
{
struct
STrashElem
*
prev
;
struct
STrashElem
*
prev
;
struct
STrashElem
*
next
;
struct
STrashElem
*
next
;
SCacheNode
*
pData
;
SCacheNode
*
pData
;
}
STrashElem
;
};
typedef
struct
SCacheIter
{
struct
SCacheIter
{
SCacheObj
*
pCacheObj
;
SCacheObj
*
pCacheObj
;
SCacheNode
**
pCurrent
;
SCacheNode
**
pCurrent
;
int32_t
entryIndex
;
int32_t
entryIndex
;
int32_t
index
;
int32_t
index
;
int32_t
numOfObj
;
int32_t
numOfObj
;
}
SCacheIter
;
};
/*
/*
* to accommodate the old data which has the same key value of new one in hashList
* to accommodate the old data which has the same key value of new one in hashList
...
...
tests/system-test/2-query/json_tag.py
浏览文件 @
efd28728
...
@@ -543,7 +543,7 @@ class TDTestCase:
...
@@ -543,7 +543,7 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
0
,
1.5
)
tdSql
.
checkData
(
0
,
0
,
1.5
)
#tdSql.query("select last_row(dataint) from jsons1 where jtag->'tag1'>1")
#tdSql.query("select last_row(dataint) from jsons1 where jtag->'tag1'>1")
#tdSql.checkData(0, 0, 11)
#tdSql.checkData(0, 0, 11)
tdSql
.
error
(
"select interp(dataint) from jsons1 where ts = '2020-06-02 09:17:08.000' and jtag->'tag1'>1"
)
#
tdSql.error("select interp(dataint) from jsons1 where ts = '2020-06-02 09:17:08.000' and jtag->'tag1'>1")
#
#
# #test calculation function:diff/derivative/spread/ceil/floor/round/
# #test calculation function:diff/derivative/spread/ceil/floor/round/
#tdSql.error("select diff(dataint) from jsons1 where jtag->'tag1'>1")
#tdSql.error("select diff(dataint) from jsons1 where jtag->'tag1'>1")
...
...
taosadapter
@
9ce3f5c9
比较
9ce3f5c9
...
9ce3f5c9
Subproject commit 9ce3f5c98ef95d9c7c596c4ed7302b0ed69a92b2
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录