Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a60ce69b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a60ce69b
编写于
4月 01, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/qnode
上级
1bf4b375
3f6e96c6
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
160 addition
and
192 deletion
+160
-192
include/common/tcommon.h
include/common/tcommon.h
+2
-2
source/dnode/mnode/impl/src/mndInfoSchema.c
source/dnode/mnode/impl/src/mndInfoSchema.c
+0
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+0
-4
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+17
-35
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-3
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+55
-75
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+77
-65
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+7
-7
source/util/src/tcompare.c
source/util/src/tcompare.c
+1
-0
未找到文件。
include/common/tcommon.h
浏览文件 @
a60ce69b
...
...
@@ -209,8 +209,8 @@ typedef struct SGroupbyExpr {
}
SGroupbyExpr
;
enum
{
FUNC_PARAM_TYPE_VALUE
=
0
,
FUNC_PARAM_TYPE_COLUMN
,
FUNC_PARAM_TYPE_VALUE
=
0
x1
,
FUNC_PARAM_TYPE_COLUMN
=
0x2
,
};
typedef
struct
SFunctParam
{
...
...
source/dnode/mnode/impl/src/mndInfoSchema.c
浏览文件 @
a60ce69b
...
...
@@ -89,7 +89,6 @@ static const SInfosTableSchema userStbsSchema[] = {
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"columns"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"tags"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"tables"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"last_update"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"table_comment"
,
.
bytes
=
1024
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_INT
},
};
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
a60ce69b
...
...
@@ -1652,10 +1652,6 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
*
(
int32_t
*
)
pWrite
=
pStb
->
numOfTags
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
0
;
// number of tables
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pStb
->
updateTime
;
// number of tables
cols
++
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
a60ce69b
...
...
@@ -73,27 +73,12 @@ typedef struct SResultRowPosition {
}
SResultRowPosition
;
typedef
struct
SResultRowInfo
{
SList
*
pRows
;
SResultRowPosition
*
pPosition
;
SResultRow
**
pResult
;
// result list
int32_t
size
;
// number of result set
int32_t
capacity
;
// max capacity
int32_t
curPos
;
// current active result row index of pResult list
}
SResultRowInfo
;
typedef
struct
SResultRowPool
{
int32_t
elemSize
;
int32_t
blockSize
;
int32_t
numOfElemPerBlock
;
struct
{
int32_t
blockIndex
;
int32_t
pos
;
}
position
;
SArray
*
pData
;
// SArray<void*>
}
SResultRowPool
;
struct
STaskAttr
;
struct
STaskRuntimeEnv
;
struct
SUdfInfo
;
...
...
@@ -109,25 +94,33 @@ void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo*
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
clearResultRow
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
);
void
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
bool
isResultRowClosed
(
SResultRow
*
pResultRow
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
);
void
*
destroyQueryFuncExpr
(
SExprInfo
*
pExprInfo
,
int32_t
numOfExpr
);
int32_t
getRowNumForMultioutput
(
struct
STaskAttr
*
pQueryAttr
,
bool
topBottomQuery
,
bool
stable
);
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
assert
(
pResultRowInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pResultRowInfo
->
size
);
return
pResultRowInfo
->
pResult
[
slot
];
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
ASSERT
(
pResultRowInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pResultRowInfo
->
size
);
SResultRowPosition
*
pos
=
&
pResultRowInfo
->
pPosition
[
slot
];
SFilePage
*
bufPage
=
(
SFilePage
*
)
getBufPage
(
pBuf
,
pos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pos
->
offset
);
return
pRow
;
}
static
FORCE_INLINE
SResultRow
*
getResultRowByPos
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
pos
)
{
SFilePage
*
bufPage
=
(
SFilePage
*
)
getBufPage
(
pBuf
,
pos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pos
->
offset
);
return
pRow
;
}
static
FORCE_INLINE
char
*
getPosInResultPage
(
struct
STaskAttr
*
pQueryAttr
,
SFilePage
*
page
,
int32_t
rowOffset
,
int32_t
offset
)
{
assert
(
rowOffset
>=
0
&&
pQueryAttr
!=
NULL
);
ASSERT
(
0
);
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
// return ((char *)page->data) + rowOffset + offset * numOfRows;
}
...
...
@@ -139,22 +132,11 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs
return
(
char
*
)
page
+
rowOffset
+
offset
*
numOfRows
;
}
//bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
//bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
__filter_func_t
getFilterOperator
(
int32_t
lowerOptr
,
int32_t
upperOptr
);
SResultRow
*
getNewResultRow
(
SResultRowPool
*
p
);
typedef
struct
{
SArray
*
pResult
;
// SArray<SResPair>
int32_t
colId
;
}
SStddevInterResult
;
void
interResToBinary
(
SBufferWriter
*
bw
,
SArray
*
pRes
,
int32_t
tagLen
);
SArray
*
interResFromBinary
(
const
char
*
data
,
int32_t
len
);
void
freeInterResult
(
void
*
param
);
void
initGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
,
SResultRowInfo
*
pResultInfo
);
void
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
a60ce69b
...
...
@@ -69,7 +69,7 @@ enum {
typedef
struct
SResultRowCell
{
uint64_t
groupId
;
SResultRow
*
pRow
;
SResultRow
Position
pos
;
}
SResultRowCell
;
/**
...
...
@@ -277,8 +277,6 @@ typedef struct STaskRuntimeEnv {
char
*
keyBuf
;
// window key buffer
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char
**
prevRow
;
SResultRowPool
*
pool
;
SArray
*
prevResult
;
// intermediate result, SArray<SInterResult>
STSBuf
*
pTsBuf
;
// timestamp filter list
STSCursor
cur
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
a60ce69b
...
...
@@ -53,15 +53,13 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
)
{
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
curPos
=
-
1
;
pResultRowInfo
->
curPos
=
-
1
;
pResultRowInfo
->
capacity
=
size
;
pResultRowInfo
->
pResult
=
taosMemoryCalloc
(
pResultRowInfo
->
capacity
,
POINTER_BYTES
);
pResultRowInfo
->
pPosition
=
taosMemoryCalloc
(
pResultRowInfo
->
capacity
,
sizeof
(
SResultRowPosition
));
if
(
pResultRowInfo
->
pResult
==
NULL
||
pResultRowInfo
->
pPosition
==
NULL
)
{
if
(
pResultRowInfo
->
pPosition
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -71,17 +69,17 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
}
if
(
pResultRowInfo
->
capacity
==
0
)
{
assert
(
pResultRowInfo
->
pResult
==
NULL
);
//
assert(pResultRowInfo->pResult == NULL);
return
;
}
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
if
(
pResultRowInfo
->
pResult
[
i
])
{
taosMemoryFreeClear
(
pResultRowInfo
->
pResult
[
i
]
->
key
);
}
//
if (pResultRowInfo->pResult[i]) {
//
taosMemoryFreeClear(pResultRowInfo->pResult[i]->key);
//
}
}
taosMemoryFreeClear
(
pResultRowInfo
->
p
Result
);
taosMemoryFreeClear
(
pResultRowInfo
->
p
Position
);
}
void
resetResultRowInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
...
...
@@ -90,8 +88,8 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
}
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
SResultRow
*
pWindowRes
=
pResultRowInfo
->
pResult
[
i
];
clearResultRow
(
pRuntimeEnv
,
pWindowRes
);
//
SResultRow *pWindowRes = pResultRowInfo->pResult[i];
//
clearResultRow(pRuntimeEnv, pWindowRes);
int32_t
groupIndex
=
0
;
int64_t
uid
=
0
;
...
...
@@ -101,14 +99,13 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
}
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
curPos
=
-
1
;
}
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
int32_t
i
=
0
;
while
(
i
<
pResultRowInfo
->
size
&&
pResultRowInfo
->
pResult
[
i
]
->
closed
)
{
++
i
;
}
//
while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) {
//
++i;
//
}
return
i
;
}
...
...
@@ -117,21 +114,22 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
SResultRow
*
pRow
=
pResultRowInfo
->
pResult
[
i
];
if
(
pRow
->
closed
)
{
continue
;
}
// ASSERT(0);
// SResultRow* pRow = pResultRowInfo->pResult[i];
// if (pRow->closed) {
// continue;
// }
pRow
->
closed
=
true
;
//
pRow->closed = true;
}
}
bool
isResultRowClosed
(
SResultRow
Info
*
pResultRowInfo
,
int32_t
slot
)
{
return
(
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
==
true
);
bool
isResultRowClosed
(
SResultRow
*
pRow
)
{
return
(
pRow
->
closed
==
true
);
}
void
closeResultRow
(
SResultRow
Info
*
pResultRowInfo
,
int32_t
slot
)
{
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
=
true
;
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
closed
=
true
;
}
void
clearResultRow
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
)
{
...
...
@@ -181,29 +179,6 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return
rowSize
;
}
SResultRow
*
getNewResultRow
(
SResultRowPool
*
p
)
{
if
(
p
==
NULL
)
{
return
NULL
;
}
void
*
ptr
=
NULL
;
if
(
p
->
position
.
pos
==
0
)
{
ptr
=
taosMemoryCalloc
(
1
,
p
->
blockSize
);
taosArrayPush
(
p
->
pData
,
&
ptr
);
}
else
{
size_t
last
=
taosArrayGetSize
(
p
->
pData
);
void
**
pBlock
=
taosArrayGet
(
p
->
pData
,
last
-
1
);
ptr
=
((
char
*
)
(
*
pBlock
))
+
p
->
elemSize
*
p
->
position
.
pos
;
}
p
->
position
.
pos
=
(
p
->
position
.
pos
+
1
)
%
p
->
numOfElemPerBlock
;
initResultRow
(
ptr
);
return
ptr
;
}
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
)
{
assert
(
pGroupResInfo
!=
NULL
);
...
...
@@ -261,8 +236,9 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return
(
int32_t
)
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
}
static
int64_t
getNumOfResultWindowRes
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int32_t
*
rowCellInfoOffset
)
{
static
int64_t
getNumOfResultWindowRes
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
Position
*
pos
,
int32_t
*
rowCellInfoOffset
)
{
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
ASSERT
(
0
);
for
(
int32_t
j
=
0
;
j
<
pQueryAttr
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
0
;
//pQueryAttr->pExpr1[j].base.functionId;
...
...
@@ -305,25 +281,26 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
return
-
1
;
}
ASSERT
(
0
);
STableQueryInfo
**
pList
=
supporter
->
pTableQueryInfo
;
SResultRow
*
pWindowRes1
=
pList
[
left
]
->
resInfo
.
pResult
[
leftPos
];
//
SResultRow* pWindowRes1 = pList[left]->resInfo.pResult[leftPos];
// SResultRow * pWindowRes1 = getResultRow(&(pList[left]->resInfo), leftPos);
TSKEY
leftTimestamp
=
pWindowRes1
->
win
.
skey
;
//
TSKEY leftTimestamp = pWindowRes1->win.skey;
// SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo);
// SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
SResultRow
*
pWindowRes2
=
pList
[
right
]
->
resInfo
.
pResult
[
rightPos
];
TSKEY
rightTimestamp
=
pWindowRes2
->
win
.
skey
;
//
SResultRow* pWindowRes2 = pList[right]->resInfo.pResult[rightPos];
//
TSKEY rightTimestamp = pWindowRes2->win.skey;
if
(
leftTimestamp
==
rightTimestamp
)
{
//
if (leftTimestamp == rightTimestamp) {
return
0
;
}
//
}
if
(
supporter
->
order
==
TSDB_ORDER_ASC
)
{
return
(
leftTimestamp
>
rightTimestamp
)
?
1
:-
1
;
}
else
{
return
(
leftTimestamp
<
rightTimestamp
)
?
1
:-
1
;
}
//
if (supporter->order == TSDB_ORDER_ASC) {
//
return (leftTimestamp > rightTimestamp)? 1:-1;
//
} else {
//
return (leftTimestamp < rightTimestamp)? 1:-1;
//
}
}
int32_t
tsAscOrder
(
const
void
*
p1
,
const
void
*
p2
)
{
...
...
@@ -331,11 +308,12 @@ int32_t tsAscOrder(const void* p1, const void* p2) {
SResultRowCell
*
pc2
=
(
SResultRowCell
*
)
p2
;
if
(
pc1
->
groupId
==
pc2
->
groupId
)
{
if
(
pc1
->
pRow
->
win
.
skey
==
pc2
->
pRow
->
win
.
skey
)
{
return
0
;
}
else
{
return
(
pc1
->
pRow
->
win
.
skey
<
pc2
->
pRow
->
win
.
skey
)
?
-
1
:
1
;
}
ASSERT
(
0
);
// if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
// return 0;
// } else {
// return (pc1->pRow->win.skey < pc2->pRow->win.skey)? -1:1;
// }
}
else
{
return
(
pc1
->
groupId
<
pc2
->
groupId
)
?
-
1
:
1
;
}
...
...
@@ -346,11 +324,12 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
SResultRowCell
*
pc2
=
(
SResultRowCell
*
)
p2
;
if
(
pc1
->
groupId
==
pc2
->
groupId
)
{
if
(
pc1
->
pRow
->
win
.
skey
==
pc2
->
pRow
->
win
.
skey
)
{
return
0
;
}
else
{
return
(
pc1
->
pRow
->
win
.
skey
<
pc2
->
pRow
->
win
.
skey
)
?
1
:-
1
;
}
ASSERT
(
0
);
// if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
// return 0;
// } else {
// return (pc1->pRow->win.skey < pc2->pRow->win.skey)? 1:-1;
// }
}
else
{
return
(
pc1
->
groupId
<
pc2
->
groupId
)
?
-
1
:
1
;
}
...
...
@@ -384,13 +363,13 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
break
;
}
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
pResultRowCell
->
pRow
,
rowCellInfoOffset
);
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
&
pResultRowCell
->
pos
,
rowCellInfoOffset
);
if
(
num
<=
0
)
{
continue
;
}
taosArrayPush
(
pGroupResInfo
->
pRows
,
&
pResultRowCell
->
p
Row
);
pResultRowCell
->
pRow
->
numOfRows
=
(
uint32_t
)
num
;
taosArrayPush
(
pGroupResInfo
->
pRows
,
&
pResultRowCell
->
p
os
);
//
pResultRowCell->pRow->numOfRows = (uint32_t) num;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -449,9 +428,10 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int32_t
tableIndex
=
tMergeTreeGetChosenIndex
(
pTree
);
SResultRowInfo
*
pWindowResInfo
=
&
pTableQueryInfoList
[
tableIndex
]
->
resInfo
;
SResultRow
*
pWindowRes
=
getResultRow
(
pWindowResInfo
,
cs
.
rowIndex
[
tableIndex
]);
ASSERT
(
0
);
SResultRow
*
pWindowRes
=
NULL
;
//getResultRow(pBuf, pWindowResInfo, cs.rowIndex[tableIndex]);
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
pWindowRes
,
rowCellInfoOffset
);
int64_t
num
=
0
;
//
getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset);
if
(
num
<=
0
)
{
cs
.
rowIndex
[
tableIndex
]
+=
1
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
a60ce69b
...
...
@@ -426,18 +426,10 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
newCapacity
+=
4
;
}
char
*
t
=
taosMemoryRealloc
(
pResultRowInfo
->
pResult
,
(
size_t
)(
newCapacity
*
POINTER_BYTES
));
if
(
t
==
NULL
)
{
longjmp
(
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pResultRowInfo
->
pPosition
=
taosMemoryRealloc
(
pResultRowInfo
->
pPosition
,
newCapacity
*
sizeof
(
SResultRowPosition
));
pResultRowInfo
->
pResult
=
(
SResultRow
**
)
t
;
int32_t
inc
=
(
int32_t
)
newCapacity
-
pResultRowInfo
->
capacity
;
memset
(
&
pResultRowInfo
->
pResult
[
pResultRowInfo
->
capacity
],
0
,
POINTER_BYTES
*
inc
);
memset
(
&
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
capacity
],
0
,
sizeof
(
SResultRowPosition
));
pResultRowInfo
->
capacity
=
(
int32_t
)
newCapacity
;
}
...
...
@@ -458,9 +450,8 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
if
(
p1
!=
NULL
)
{
if
(
pResultRowInfo
->
size
==
0
)
{
existed
=
false
;
assert
(
pResultRowInfo
->
curPos
==
-
1
);
}
else
if
(
pResultRowInfo
->
size
==
1
)
{
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
));
//
existed = (pResultRowInfo->pResult[0] == (*p1));
}
else
{
// check if current pResultRowInfo contains the existed pResultRow
SET_RES_EXT_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
pData
,
bytes
,
uid
,
pResultRowInfo
);
int64_t
*
index
=
...
...
@@ -479,6 +470,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
return
p1
!=
NULL
;
}
#if 0
static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
bool existed = false;
...
...
@@ -496,16 +488,16 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
if (p1 != NULL) {
if (pResultRowInfo->size == 0) {
existed = false;
assert
(
pResultRowInfo
->
curPos
==
-
1
);
//
assert(pResultRowInfo->curPos == -1);
} else if (pResultRowInfo->size == 1) {
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
));
pResultRowInfo
->
curPos
=
0
;
//
existed = (pResultRowInfo->pResult[0] == (*p1));
//
pResultRowInfo->curPos = 0;
} else { // check if current pResultRowInfo contains the existed pResultRow
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid, pResultRowInfo);
int64_t* index =
taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
if (index != NULL) {
pResultRowInfo
->
curPos
=
(
int32_t
)
*
index
;
//
pResultRowInfo->curPos = (int32_t)*index;
existed = true;
} else {
existed = false;
...
...
@@ -555,6 +547,7 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
return pResultRowInfo->pResult[pResultRowInfo->curPos];
}
#endif
SResultRow
*
getNewResultRow_rv
(
SDiskbasedBuf
*
pResultBuf
,
int64_t
tableGroupId
,
int32_t
interBufSize
)
{
SFilePage
*
pData
=
NULL
;
...
...
@@ -599,65 +592,75 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
static
SResultRow
*
doSetResultOutBufByKey_rv
(
SDiskbasedBuf
*
pResultBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
tid
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
tableGroupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
)
{
bool
exist
ed
=
false
;
bool
exist
InCurrentResusltRowInfo
=
false
;
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tableGroupId
);
SResultRow
**
p1
=
(
SResultRow
*
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
SResultRow
Position
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
// in case of repeat scan/reverse scan, no new time window added.
if
(
isIntervalQuery
)
{
if
(
!
masterscan
)
{
// the *p1 may be NULL in case of sliding+offset exists.
return
(
p1
!=
NULL
)
?
*
p1
:
NULL
;
if
(
p1
!=
NULL
)
{
return
getResultRowByPos
(
pResultBuf
,
p1
);
}
else
{
return
NULL
;
}
}
if
(
p1
!=
NULL
)
{
if
(
pResultRowInfo
->
size
==
0
)
{
exist
ed
=
false
;
exist
InCurrentResusltRowInfo
=
false
;
// this time window created by other timestamp that does not belongs to current table.
assert
(
pResultRowInfo
->
curPos
==
-
1
);
}
else
if
(
pResultRowInfo
->
size
==
1
)
{
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
)
);
pResultRowInfo
->
curPos
=
0
;
}
else
{
// check if current pResultRowInfo contains the exist
ed
pResultRow
ASSERT
(
0
);
// existInCurrentResusltRowInfo = (pResultRowInfo->pResult[0] == (*p1))
;
}
else
{
// check if current pResultRowInfo contains the exist
InCurrentResusltRowInfo
pResultRow
SET_RES_EXT_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
int64_t
*
index
=
taosHashGet
(
pSup
->
pResultRowListSet
,
pSup
->
keyBuf
,
GET_RES_EXT_WINDOW_KEY_LEN
(
bytes
));
if
(
index
!=
NULL
)
{
pResultRowInfo
->
curPos
=
(
int32_t
)
*
index
;
existed
=
true
;
// TODO check the scan order for current opened time window
// pResultRowInfo->curPos = (int32_t)*index;
existInCurrentResusltRowInfo
=
true
;
}
else
{
exist
ed
=
false
;
exist
InCurrentResusltRowInfo
=
false
;
}
}
}
}
else
{
// In case of group by column query, the required SResultRow object must be exist
ed
in the pResultRowInfo object.
// In case of group by column query, the required SResultRow object must be exist
InCurrentResusltRowInfo
in the pResultRowInfo object.
if
(
p1
!=
NULL
)
{
return
*
p1
;
return
getResultRowByPos
(
pResultBuf
,
p1
)
;
}
}
if
(
!
existed
)
{
prepareResultListBuffer
(
pResultRowInfo
,
pTaskInfo
->
env
);
SResultRow
*
pResult
=
NULL
;
if
(
!
existInCurrentResusltRowInfo
)
{
// 1. close current opened time window
if
(
pResultRowInfo
->
curPos
!=
-
1
)
{
// todo extract function
SResultRowPosition
*
pos
=
&
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
curPos
];
SFilePage
*
pPage
=
getBufPage
(
pResultBuf
,
pos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
pPage
+
pos
->
offset
);
closeResultRow
(
pRow
);
releaseBufPage
(
pResultBuf
,
pPage
);
}
SResultRow
*
pResult
=
NULL
;
prepareResultListBuffer
(
pResultRowInfo
,
pTaskInfo
->
env
)
;
if
(
p1
==
NULL
)
{
pResult
=
getNewResultRow_rv
(
pResultBuf
,
tableGroupId
,
pSup
->
resultRowSize
);
int32_t
ret
=
initResultRow
(
pResult
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
initResultRow
(
pResult
);
// add a new result set for a new group
taosHashPut
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pResult
,
POINTER_BYTES
);
SResultRowCell
cell
=
{.
groupId
=
tableGroupId
,
.
pRow
=
pResult
};
SResultRowPosition
pos
=
{.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
taosHashPut
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pos
,
POINTER_BYTES
);
SResultRowCell
cell
=
{.
groupId
=
tableGroupId
,
.
pos
=
pos
};
taosArrayPush
(
pSup
->
pResultRowArrayList
,
&
cell
);
}
else
{
pResult
=
*
p1
;
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
)
;
}
// 2. set the new time window to be the new active time window
pResultRowInfo
->
curPos
=
pResultRowInfo
->
size
;
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
size
]
=
(
SResultRowPosition
){.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
++
]
=
pResult
;
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
size
++
]
=
(
SResultRowPosition
){.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
int64_t
index
=
pResultRowInfo
->
curPos
;
SET_RES_EXT_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
...
...
@@ -669,7 +672,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
}
return
pResult
RowInfo
->
pResult
[
pResultRowInfo
->
curPos
]
;
return
pResult
;
}
static
void
getInitialStartTimeWindow
(
SInterval
*
pInterval
,
int32_t
precision
,
TSKEY
ts
,
STimeWindow
*
w
,
TSKEY
ekey
,
...
...
@@ -693,7 +696,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T
}
// get the correct time window according to the handled timestamp
static
STimeWindow
getActiveTimeWindow
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
static
STimeWindow
getActiveTimeWindow
(
S
DiskbasedBuf
*
pBuf
,
S
ResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
precision
,
STimeWindow
*
win
)
{
STimeWindow
w
=
{
0
};
...
...
@@ -701,7 +704,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo* pResultRowInfo, int64_t t
getInitialStartTimeWindow
(
pInterval
,
precision
,
ts
,
&
w
,
win
->
ekey
,
true
);
w
.
ekey
=
taosTimeAdd
(
w
.
skey
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
precision
)
-
1
;
}
else
{
w
=
getResultRow
(
pResultRowInfo
,
pResultRowInfo
->
curPos
)
->
win
;
w
=
getResultRow
(
p
Buf
,
p
ResultRowInfo
,
pResultRowInfo
->
curPos
)
->
win
;
}
if
(
w
.
skey
>
ts
||
w
.
ekey
<
ts
)
{
...
...
@@ -730,7 +733,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo* pResultRowInfo, int64_t t
// get the correct time window according to the handled timestamp
static
STimeWindow
getCurrentActiveTimeWindow
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
STaskAttr
*
pQueryAttr
)
{
STimeWindow
w
=
{
0
};
#if 0
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
// getInitialStartTimeWindow(pQueryAttr, ts, &w);
...
...
@@ -742,7 +745,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo* pResultRowInfo, in
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
}
} else {
w
=
getResultRow
(
pResultRowInfo
,
pResultRowInfo
->
curPos
)
->
win
;
w =
pRow
->win;
}
/*
...
...
@@ -752,6 +755,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo* pResultRowInfo, in
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) {
w.ekey = pQueryAttr->window.ekey;
}
#endif
return
w
;
}
...
...
@@ -816,8 +820,8 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowI
assert
(
win
->
skey
<=
win
->
ekey
);
SDiskbasedBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
SResultRow
*
pResultRow
=
doSetResultOutBufByKey
(
pRuntimeEnv
,
pResultRowInfo
,
tid
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
masterscan
,
tableGroupId
);
SResultRow
*
pResultRow
=
NULL
;
//
doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&win->skey, TSDB_KEYSIZE,
//
masterscan, tableGroupId);
if
(
pResultRow
==
NULL
)
{
*
pResult
=
NULL
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -909,9 +913,9 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return
forwardStep
;
}
static
void
doUpdateResultRowIndex
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
bool
ascQuery
,
bool
timeWindowInterpo
)
{
static
void
doUpdateResultRowIndex
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
bool
ascQuery
,
bool
timeWindowInterpo
)
{
int64_t
skey
=
TSKEY_INITIAL_VAL
;
#if 0
int32_t i = 0;
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
...
...
@@ -963,6 +967,7 @@ static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey
pResultRowInfo->curPos = i + 1; // current not closed result object
}
}
#endif
}
static
void
updateResultRowInfoActiveIndex
(
SResultRowInfo
*
pResultRowInfo
,
const
STimeWindow
*
pWin
,
TSKEY
lastKey
,
...
...
@@ -1253,8 +1258,8 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
ASSERT
(
pCtx
[
i
].
input
.
pData
[
j
]
!=
NULL
);
}
}
// setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
// setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
// uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag;
// if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) ||
// (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) {
...
...
@@ -1551,14 +1556,14 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
if
(
pSDataBlock
->
pDataBlock
!=
NULL
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
0
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
// assert(tsCols[0] == pSDataBlock->info.window.skey &&
//
tsCols[pSDataBlock->info.rows - 1] ==
pSDataBlock->info.window.ekey);
// assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] ==
// pSDataBlock->info.window.ekey);
}
int32_t
startPos
=
ascScan
?
0
:
(
pSDataBlock
->
info
.
rows
-
1
);
TSKEY
ts
=
getStartTsKey
(
&
pSDataBlock
->
info
.
window
,
tsCols
,
pSDataBlock
->
info
.
rows
,
ascScan
);
STimeWindow
win
=
getActiveTimeWindow
(
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
&
pInfo
->
win
);
STimeWindow
win
=
getActiveTimeWindow
(
p
Info
->
aggSup
.
pResultBuf
,
p
ResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
&
pInfo
->
win
);
bool
masterScan
=
true
;
SResultRow
*
pResult
=
NULL
;
...
...
@@ -1581,6 +1586,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// prev time window not interpolation yet.
int32_t
curIndex
=
pResultRowInfo
->
curPos
;
#if 0
if (prevIndex != -1 && prevIndex < curIndex && pInfo->timeWindowInterpo) {
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow* pRes = getResultRow(pResultRowInfo, j);
...
...
@@ -1615,6 +1622,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
#endif
// window start key interpolation
doWindowBorderInterpolation
(
pOperatorInfo
,
pSDataBlock
,
pInfo
->
binfo
.
pCtx
,
pResult
,
&
win
,
startPos
,
forwardStep
,
...
...
@@ -3430,10 +3438,8 @@ void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
}
}
// TODO fix this bug.
int32_t
initResultRow
(
SResultRow
*
pResultRow
)
{
void
initResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
pEntryInfo
=
(
struct
SResultRowEntryInfo
*
)((
char
*
)
pResultRow
+
sizeof
(
SResultRow
));
return
TSDB_CODE_SUCCESS
;
}
/*
...
...
@@ -3449,7 +3455,9 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
SqlFunctionCtx
*
pCtx
=
pInfo
->
pCtx
;
SSDataBlock
*
pDataBlock
=
pInfo
->
pRes
;
int32_t
*
rowCellInfoOffset
=
pInfo
->
rowCellInfoOffset
;
SResultRowInfo
*
pResultRowInfo
=
&
pInfo
->
resultRowInfo
;
initResultRowInfo
(
pResultRowInfo
,
16
);
int64_t
tid
=
0
;
int64_t
groupId
=
0
;
...
...
@@ -3610,9 +3618,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
SFilePage
*
bufPage
=
getBufPage
(
pBuf
,
pPos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pPos
->
offset
);
if
(
!
isResultRowClosed
(
pResultRowInfo
,
i
))
{
continue
;
}
// TODO ignore the close status anyway.
// if (!isResultRowClosed(pRow)) {
// continue;
// }
for
(
int32_t
j
=
0
;
j
<
numOfOutput
;
++
j
)
{
pCtx
[
j
].
resultInfo
=
getResultCell
(
pRow
,
j
,
rowCellInfoOffset
);
...
...
@@ -3622,7 +3632,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
continue
;
}
if
(
pCtx
[
j
].
fpSet
.
process
)
{
// TODO set the dummy function.
if
(
pCtx
[
j
].
fpSet
.
process
)
{
// TODO set the dummy function
, to avoid the check for null ptr
.
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
]);
}
...
...
@@ -4132,7 +4142,7 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
// return;
// }
#if 0
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
...
...
@@ -4146,6 +4156,8 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu
pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
}
}
#endif
}
static
int32_t
compressQueryColData
(
SColumnInfoData
*
pColRes
,
int32_t
numOfRows
,
char
*
data
,
int8_t
compressed
)
{
...
...
@@ -7585,10 +7597,10 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
int32_t
doInitAggInfoSup
(
SAggSupporter
*
pAggSup
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
const
char
*
pKey
)
{
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pAggSup
->
resultRowSize
=
getResultRowSize
(
pCtx
,
numOfOutput
);
pAggSup
->
keyBuf
=
taosMemoryCalloc
(
1
,
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
);
pAggSup
->
resultRowSize
=
getResultRowSize
(
pCtx
,
numOfOutput
);
pAggSup
->
keyBuf
=
taosMemoryCalloc
(
1
,
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
);
pAggSup
->
pResultRowHashTable
=
taosHashInit
(
10
,
hashFn
,
true
,
HASH_NO_LOCK
);
pAggSup
->
pResultRowListSet
=
taosHashInit
(
100
,
hashFn
,
false
,
HASH_NO_LOCK
);
pAggSup
->
pResultRowListSet
=
taosHashInit
(
100
,
hashFn
,
false
,
HASH_NO_LOCK
);
pAggSup
->
pResultRowArrayList
=
taosArrayInit
(
10
,
sizeof
(
SResultRowCell
));
if
(
pAggSup
->
keyBuf
==
NULL
||
pAggSup
->
pResultRowArrayList
==
NULL
||
pAggSup
->
pResultRowListSet
==
NULL
||
...
...
@@ -8761,8 +8773,8 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp
->
pExpr
->
_optrRoot
.
pRootNode
=
pTargetNode
->
pExpr
;
pExp
->
base
.
pParam
[
0
].
type
=
FUNC_PARAM_TYPE_COLUMN
;
pExp
->
base
.
pParam
[
0
].
pCol
=
createColumn
(
pTargetNode
->
dataBlockId
,
pTargetNode
->
slotId
,
pType
);
//
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
//
pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType);
}
else
{
ASSERT
(
0
);
}
...
...
source/libs/scalar/src/sclvector.c
浏览文件 @
a60ce69b
...
...
@@ -720,7 +720,7 @@ void vectorMathRemainder(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam
double
lx
=
getVectorDoubleValueFnLeft
(
pLeftCol
->
pData
,
i
);
double
rx
=
getVectorDoubleValueFnRight
(
pRightCol
->
pData
,
i
);
if
(
compareDoubleVal
(
&
zero
,
&
rx
))
{
if
(
isnan
(
lx
)
||
isinf
(
lx
)
||
isnan
(
rx
)
||
isinf
(
rx
))
{
colDataAppend
(
pOutputCol
,
i
,
NULL
,
true
);
continue
;
}
...
...
@@ -729,7 +729,7 @@ void vectorMathRemainder(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam
}
}
else
if
(
pLeft
->
numOfRows
==
1
)
{
double
lx
=
getVectorDoubleValueFnLeft
(
pLeftCol
->
pData
,
0
);
if
(
colDataIsNull_f
(
pLeftCol
->
nullbitmap
,
0
))
{
// Set pLeft->numOfRows NULL value
if
(
colDataIsNull_f
(
pLeftCol
->
nullbitmap
,
0
)
||
isnan
(
lx
)
||
isinf
(
lx
)
)
{
// Set pLeft->numOfRows NULL value
// TODO set numOfRows NULL value
}
else
{
for
(;
i
>=
0
&&
i
<
pRight
->
numOfRows
;
i
+=
step
,
output
+=
1
)
{
...
...
@@ -739,7 +739,7 @@ void vectorMathRemainder(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam
}
double
rx
=
getVectorDoubleValueFnRight
(
pRightCol
->
pData
,
i
);
if
(
compareDoubleVal
(
&
zero
,
&
rx
))
{
if
(
isnan
(
rx
)
||
isinf
(
rx
)
||
FLT_EQUAL
(
rx
,
0
))
{
colDataAppend
(
pOutputCol
,
i
,
NULL
,
true
);
continue
;
}
...
...
@@ -749,17 +749,17 @@ void vectorMathRemainder(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam
}
}
else
if
(
pRight
->
numOfRows
==
1
)
{
double
rx
=
getVectorDoubleValueFnRight
(
pRightCol
->
pData
,
0
);
if
(
colDataIsNull_f
(
pRightCol
->
nullbitmap
,
0
))
{
// Set pLeft->numOfRows NULL value
if
(
colDataIsNull_f
(
pRightCol
->
nullbitmap
,
0
)
||
FLT_EQUAL
(
rx
,
0
)
)
{
// Set pLeft->numOfRows NULL value
// TODO set numOfRows NULL value
}
else
{
for
(;
i
>=
0
&&
i
<
pLeft
->
numOfRows
;
i
+=
step
,
output
+=
1
)
{
if
(
colDataIsNull_f
(
p
Righ
tCol
->
nullbitmap
,
i
))
{
if
(
colDataIsNull_f
(
p
Lef
tCol
->
nullbitmap
,
i
))
{
colDataAppend
(
pOutputCol
,
i
,
NULL
,
true
);
continue
;
}
double
lx
=
getVectorDoubleValueFnLeft
(
p
Righ
tCol
->
pData
,
i
);
if
(
compareDoubleVal
(
&
zero
,
&
lx
))
{
double
lx
=
getVectorDoubleValueFnLeft
(
p
Lef
tCol
->
pData
,
i
);
if
(
isnan
(
lx
)
||
isinf
(
lx
))
{
colDataAppend
(
pOutputCol
,
i
,
NULL
,
true
);
continue
;
}
...
...
source/util/src/tcompare.c
浏览文件 @
a60ce69b
...
...
@@ -173,6 +173,7 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
if
(
isnan
(
p2
))
{
return
1
;
}
if
(
FLT_EQUAL
(
p1
,
p2
))
{
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录