Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
847775c4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
847775c4
编写于
3月 31, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-13039] refactor.
上级
387cfe8d
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
142 addition
and
172 deletion
+142
-172
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+17
-35
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-3
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+55
-75
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+69
-59
未找到文件。
source/libs/executor/inc/executil.h
浏览文件 @
847775c4
...
@@ -73,27 +73,12 @@ typedef struct SResultRowPosition {
...
@@ -73,27 +73,12 @@ typedef struct SResultRowPosition {
}
SResultRowPosition
;
}
SResultRowPosition
;
typedef
struct
SResultRowInfo
{
typedef
struct
SResultRowInfo
{
SList
*
pRows
;
SResultRowPosition
*
pPosition
;
SResultRowPosition
*
pPosition
;
SResultRow
**
pResult
;
// result list
int32_t
size
;
// number of result set
int32_t
size
;
// number of result set
int32_t
capacity
;
// max capacity
int32_t
capacity
;
// max capacity
int32_t
curPos
;
// current active result row index of pResult list
int32_t
curPos
;
// current active result row index of pResult list
}
SResultRowInfo
;
}
SResultRowInfo
;
typedef
struct
SResultRowPool
{
int32_t
elemSize
;
int32_t
blockSize
;
int32_t
numOfElemPerBlock
;
struct
{
int32_t
blockIndex
;
int32_t
pos
;
}
position
;
SArray
*
pData
;
// SArray<void*>
}
SResultRowPool
;
struct
STaskAttr
;
struct
STaskAttr
;
struct
STaskRuntimeEnv
;
struct
STaskRuntimeEnv
;
struct
SUdfInfo
;
struct
SUdfInfo
;
...
@@ -109,25 +94,33 @@ void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo*
...
@@ -109,25 +94,33 @@ void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo*
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRow
*
pResultRow
);
void
clearResultRow
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
);
void
*
destroyQueryFuncExpr
(
SExprInfo
*
pExprInfo
,
int32_t
numOfExpr
);
int32_t
getRowNumForMultioutput
(
struct
STaskAttr
*
pQueryAttr
,
bool
topBottomQuery
,
bool
stable
);
int32_t
getRowNumForMultioutput
(
struct
STaskAttr
*
pQueryAttr
,
bool
topBottomQuery
,
bool
stable
);
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
assert
(
pResultRowInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pResultRowInfo
->
size
);
ASSERT
(
pResultRowInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pResultRowInfo
->
size
);
return
pResultRowInfo
->
pResult
[
slot
];
SResultRowPosition
*
pos
=
&
pResultRowInfo
->
pPosition
[
slot
];
SFilePage
*
bufPage
=
getBufPage
(
pBuf
,
pos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pos
->
offset
);
return
pRow
;
}
static
FORCE_INLINE
SResultRow
*
getResultRowByPos
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
pos
)
{
SFilePage
*
bufPage
=
getBufPage
(
pBuf
,
pos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pos
->
offset
);
return
pRow
;
}
}
static
FORCE_INLINE
char
*
getPosInResultPage
(
struct
STaskAttr
*
pQueryAttr
,
SFilePage
*
page
,
int32_t
rowOffset
,
static
FORCE_INLINE
char
*
getPosInResultPage
(
struct
STaskAttr
*
pQueryAttr
,
SFilePage
*
page
,
int32_t
rowOffset
,
int32_t
offset
)
{
int32_t
offset
)
{
assert
(
rowOffset
>=
0
&&
pQueryAttr
!=
NULL
);
assert
(
rowOffset
>=
0
&&
pQueryAttr
!=
NULL
);
ASSERT
(
0
);
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
// return ((char *)page->data) + rowOffset + offset * numOfRows;
// return ((char *)page->data) + rowOffset + offset * numOfRows;
}
}
...
@@ -139,22 +132,11 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs
...
@@ -139,22 +132,11 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs
return
(
char
*
)
page
+
rowOffset
+
offset
*
numOfRows
;
return
(
char
*
)
page
+
rowOffset
+
offset
*
numOfRows
;
}
}
//bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
//bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
__filter_func_t
getFilterOperator
(
int32_t
lowerOptr
,
int32_t
upperOptr
);
SResultRow
*
getNewResultRow
(
SResultRowPool
*
p
);
typedef
struct
{
typedef
struct
{
SArray
*
pResult
;
// SArray<SResPair>
SArray
*
pResult
;
// SArray<SResPair>
int32_t
colId
;
int32_t
colId
;
}
SStddevInterResult
;
}
SStddevInterResult
;
void
interResToBinary
(
SBufferWriter
*
bw
,
SArray
*
pRes
,
int32_t
tagLen
);
SArray
*
interResFromBinary
(
const
char
*
data
,
int32_t
len
);
void
freeInterResult
(
void
*
param
);
void
initGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
,
SResultRowInfo
*
pResultInfo
);
void
initGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
,
SResultRowInfo
*
pResultInfo
);
void
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
);
void
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
847775c4
...
@@ -69,7 +69,7 @@ enum {
...
@@ -69,7 +69,7 @@ enum {
typedef
struct
SResultRowCell
{
typedef
struct
SResultRowCell
{
uint64_t
groupId
;
uint64_t
groupId
;
SResultRow
*
pRow
;
SResultRow
Position
pos
;
}
SResultRowCell
;
}
SResultRowCell
;
/**
/**
...
@@ -277,8 +277,6 @@ typedef struct STaskRuntimeEnv {
...
@@ -277,8 +277,6 @@ typedef struct STaskRuntimeEnv {
char
*
keyBuf
;
// window key buffer
char
*
keyBuf
;
// window key buffer
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char
**
prevRow
;
char
**
prevRow
;
SResultRowPool
*
pool
;
SArray
*
prevResult
;
// intermediate result, SArray<SInterResult>
SArray
*
prevResult
;
// intermediate result, SArray<SInterResult>
STSBuf
*
pTsBuf
;
// timestamp filter list
STSBuf
*
pTsBuf
;
// timestamp filter list
STSCursor
cur
;
STSCursor
cur
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
847775c4
...
@@ -53,15 +53,13 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
...
@@ -53,15 +53,13 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
)
{
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
)
{
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
curPos
=
-
1
;
pResultRowInfo
->
curPos
=
-
1
;
pResultRowInfo
->
capacity
=
size
;
pResultRowInfo
->
capacity
=
size
;
pResultRowInfo
->
pResult
=
taosMemoryCalloc
(
pResultRowInfo
->
capacity
,
POINTER_BYTES
);
pResultRowInfo
->
pPosition
=
taosMemoryCalloc
(
pResultRowInfo
->
capacity
,
sizeof
(
SResultRowPosition
));
pResultRowInfo
->
pPosition
=
taosMemoryCalloc
(
pResultRowInfo
->
capacity
,
sizeof
(
SResultRowPosition
));
if
(
pResultRowInfo
->
pResult
==
NULL
||
pResultRowInfo
->
pPosition
==
NULL
)
{
if
(
pResultRowInfo
->
pPosition
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -71,17 +69,17 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
...
@@ -71,17 +69,17 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
}
}
if
(
pResultRowInfo
->
capacity
==
0
)
{
if
(
pResultRowInfo
->
capacity
==
0
)
{
assert
(
pResultRowInfo
->
pResult
==
NULL
);
//
assert(pResultRowInfo->pResult == NULL);
return
;
return
;
}
}
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
if
(
pResultRowInfo
->
pResult
[
i
])
{
//
if (pResultRowInfo->pResult[i]) {
taosMemoryFreeClear
(
pResultRowInfo
->
pResult
[
i
]
->
key
);
//
taosMemoryFreeClear(pResultRowInfo->pResult[i]->key);
}
//
}
}
}
taosMemoryFreeClear
(
pResultRowInfo
->
p
Result
);
taosMemoryFreeClear
(
pResultRowInfo
->
p
Position
);
}
}
void
resetResultRowInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
void
resetResultRowInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
...
@@ -90,8 +88,8 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
...
@@ -90,8 +88,8 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
}
}
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
SResultRow
*
pWindowRes
=
pResultRowInfo
->
pResult
[
i
];
//
SResultRow *pWindowRes = pResultRowInfo->pResult[i];
clearResultRow
(
pRuntimeEnv
,
pWindowRes
);
//
clearResultRow(pRuntimeEnv, pWindowRes);
int32_t
groupIndex
=
0
;
int32_t
groupIndex
=
0
;
int64_t
uid
=
0
;
int64_t
uid
=
0
;
...
@@ -101,14 +99,13 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
...
@@ -101,14 +99,13 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
}
}
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
curPos
=
-
1
;
}
}
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
int32_t
i
=
0
;
int32_t
i
=
0
;
while
(
i
<
pResultRowInfo
->
size
&&
pResultRowInfo
->
pResult
[
i
]
->
closed
)
{
//
while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) {
++
i
;
//
++i;
}
//
}
return
i
;
return
i
;
}
}
...
@@ -117,21 +114,22 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
...
@@ -117,21 +114,22 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
assert
(
pResultRowInfo
->
size
>=
0
&&
pResultRowInfo
->
capacity
>=
pResultRowInfo
->
size
);
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
SResultRow
*
pRow
=
pResultRowInfo
->
pResult
[
i
];
// ASSERT(0);
if
(
pRow
->
closed
)
{
// SResultRow* pRow = pResultRowInfo->pResult[i];
continue
;
// if (pRow->closed) {
}
// continue;
// }
pRow
->
closed
=
true
;
//
pRow->closed = true;
}
}
}
}
bool
isResultRowClosed
(
SResultRow
Info
*
pResultRowInfo
,
int32_t
slot
)
{
bool
isResultRowClosed
(
SResultRow
*
pRow
)
{
return
(
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
==
true
);
return
(
pRow
->
closed
==
true
);
}
}
void
closeResultRow
(
SResultRow
Info
*
pResultRowInfo
,
int32_t
slot
)
{
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
=
true
;
pResultRow
->
closed
=
true
;
}
}
void
clearResultRow
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
)
{
void
clearResultRow
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
)
{
...
@@ -181,29 +179,6 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
...
@@ -181,29 +179,6 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return
rowSize
;
return
rowSize
;
}
}
SResultRow
*
getNewResultRow
(
SResultRowPool
*
p
)
{
if
(
p
==
NULL
)
{
return
NULL
;
}
void
*
ptr
=
NULL
;
if
(
p
->
position
.
pos
==
0
)
{
ptr
=
taosMemoryCalloc
(
1
,
p
->
blockSize
);
taosArrayPush
(
p
->
pData
,
&
ptr
);
}
else
{
size_t
last
=
taosArrayGetSize
(
p
->
pData
);
void
**
pBlock
=
taosArrayGet
(
p
->
pData
,
last
-
1
);
ptr
=
((
char
*
)
(
*
pBlock
))
+
p
->
elemSize
*
p
->
position
.
pos
;
}
p
->
position
.
pos
=
(
p
->
position
.
pos
+
1
)
%
p
->
numOfElemPerBlock
;
initResultRow
(
ptr
);
return
ptr
;
}
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
)
{
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
)
{
assert
(
pGroupResInfo
!=
NULL
);
assert
(
pGroupResInfo
!=
NULL
);
...
@@ -261,8 +236,9 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
...
@@ -261,8 +236,9 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return
(
int32_t
)
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
return
(
int32_t
)
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
}
}
static
int64_t
getNumOfResultWindowRes
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int32_t
*
rowCellInfoOffset
)
{
static
int64_t
getNumOfResultWindowRes
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
Position
*
pos
,
int32_t
*
rowCellInfoOffset
)
{
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
ASSERT
(
0
);
for
(
int32_t
j
=
0
;
j
<
pQueryAttr
->
numOfOutput
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pQueryAttr
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
0
;
//pQueryAttr->pExpr1[j].base.functionId;
int32_t
functionId
=
0
;
//pQueryAttr->pExpr1[j].base.functionId;
...
@@ -305,25 +281,26 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
...
@@ -305,25 +281,26 @@ static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *
return
-
1
;
return
-
1
;
}
}
ASSERT
(
0
);
STableQueryInfo
**
pList
=
supporter
->
pTableQueryInfo
;
STableQueryInfo
**
pList
=
supporter
->
pTableQueryInfo
;
SResultRow
*
pWindowRes1
=
pList
[
left
]
->
resInfo
.
pResult
[
leftPos
];
//
SResultRow* pWindowRes1 = pList[left]->resInfo.pResult[leftPos];
// SResultRow * pWindowRes1 = getResultRow(&(pList[left]->resInfo), leftPos);
// SResultRow * pWindowRes1 = getResultRow(&(pList[left]->resInfo), leftPos);
TSKEY
leftTimestamp
=
pWindowRes1
->
win
.
skey
;
//
TSKEY leftTimestamp = pWindowRes1->win.skey;
// SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo);
// SResultRowInfo *pWindowResInfo2 = &(pList[right]->resInfo);
// SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
// SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
SResultRow
*
pWindowRes2
=
pList
[
right
]
->
resInfo
.
pResult
[
rightPos
];
//
SResultRow* pWindowRes2 = pList[right]->resInfo.pResult[rightPos];
TSKEY
rightTimestamp
=
pWindowRes2
->
win
.
skey
;
//
TSKEY rightTimestamp = pWindowRes2->win.skey;
if
(
leftTimestamp
==
rightTimestamp
)
{
//
if (leftTimestamp == rightTimestamp) {
return
0
;
return
0
;
}
//
}
if
(
supporter
->
order
==
TSDB_ORDER_ASC
)
{
//
if (supporter->order == TSDB_ORDER_ASC) {
return
(
leftTimestamp
>
rightTimestamp
)
?
1
:-
1
;
//
return (leftTimestamp > rightTimestamp)? 1:-1;
}
else
{
//
} else {
return
(
leftTimestamp
<
rightTimestamp
)
?
1
:-
1
;
//
return (leftTimestamp < rightTimestamp)? 1:-1;
}
//
}
}
}
int32_t
tsAscOrder
(
const
void
*
p1
,
const
void
*
p2
)
{
int32_t
tsAscOrder
(
const
void
*
p1
,
const
void
*
p2
)
{
...
@@ -331,11 +308,12 @@ int32_t tsAscOrder(const void* p1, const void* p2) {
...
@@ -331,11 +308,12 @@ int32_t tsAscOrder(const void* p1, const void* p2) {
SResultRowCell
*
pc2
=
(
SResultRowCell
*
)
p2
;
SResultRowCell
*
pc2
=
(
SResultRowCell
*
)
p2
;
if
(
pc1
->
groupId
==
pc2
->
groupId
)
{
if
(
pc1
->
groupId
==
pc2
->
groupId
)
{
if
(
pc1
->
pRow
->
win
.
skey
==
pc2
->
pRow
->
win
.
skey
)
{
ASSERT
(
0
);
return
0
;
// if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
}
else
{
// return 0;
return
(
pc1
->
pRow
->
win
.
skey
<
pc2
->
pRow
->
win
.
skey
)
?
-
1
:
1
;
// } else {
}
// return (pc1->pRow->win.skey < pc2->pRow->win.skey)? -1:1;
// }
}
else
{
}
else
{
return
(
pc1
->
groupId
<
pc2
->
groupId
)
?
-
1
:
1
;
return
(
pc1
->
groupId
<
pc2
->
groupId
)
?
-
1
:
1
;
}
}
...
@@ -346,11 +324,12 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
...
@@ -346,11 +324,12 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
SResultRowCell
*
pc2
=
(
SResultRowCell
*
)
p2
;
SResultRowCell
*
pc2
=
(
SResultRowCell
*
)
p2
;
if
(
pc1
->
groupId
==
pc2
->
groupId
)
{
if
(
pc1
->
groupId
==
pc2
->
groupId
)
{
if
(
pc1
->
pRow
->
win
.
skey
==
pc2
->
pRow
->
win
.
skey
)
{
ASSERT
(
0
);
return
0
;
// if (pc1->pRow->win.skey == pc2->pRow->win.skey) {
}
else
{
// return 0;
return
(
pc1
->
pRow
->
win
.
skey
<
pc2
->
pRow
->
win
.
skey
)
?
1
:-
1
;
// } else {
}
// return (pc1->pRow->win.skey < pc2->pRow->win.skey)? 1:-1;
// }
}
else
{
}
else
{
return
(
pc1
->
groupId
<
pc2
->
groupId
)
?
-
1
:
1
;
return
(
pc1
->
groupId
<
pc2
->
groupId
)
?
-
1
:
1
;
}
}
...
@@ -384,13 +363,13 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
...
@@ -384,13 +363,13 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
break
;
break
;
}
}
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
pResultRowCell
->
pRow
,
rowCellInfoOffset
);
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
&
pResultRowCell
->
pos
,
rowCellInfoOffset
);
if
(
num
<=
0
)
{
if
(
num
<=
0
)
{
continue
;
continue
;
}
}
taosArrayPush
(
pGroupResInfo
->
pRows
,
&
pResultRowCell
->
p
Row
);
taosArrayPush
(
pGroupResInfo
->
pRows
,
&
pResultRowCell
->
p
os
);
pResultRowCell
->
pRow
->
numOfRows
=
(
uint32_t
)
num
;
//
pResultRowCell->pRow->numOfRows = (uint32_t) num;
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -449,9 +428,10 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
...
@@ -449,9 +428,10 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int32_t
tableIndex
=
tMergeTreeGetChosenIndex
(
pTree
);
int32_t
tableIndex
=
tMergeTreeGetChosenIndex
(
pTree
);
SResultRowInfo
*
pWindowResInfo
=
&
pTableQueryInfoList
[
tableIndex
]
->
resInfo
;
SResultRowInfo
*
pWindowResInfo
=
&
pTableQueryInfoList
[
tableIndex
]
->
resInfo
;
SResultRow
*
pWindowRes
=
getResultRow
(
pWindowResInfo
,
cs
.
rowIndex
[
tableIndex
]);
ASSERT
(
0
);
SResultRow
*
pWindowRes
=
NULL
;
//getResultRow(pBuf, pWindowResInfo, cs.rowIndex[tableIndex]);
int64_t
num
=
getNumOfResultWindowRes
(
pRuntimeEnv
,
pWindowRes
,
rowCellInfoOffset
);
int64_t
num
=
0
;
//
getNumOfResultWindowRes(pRuntimeEnv, pWindowRes, rowCellInfoOffset);
if
(
num
<=
0
)
{
if
(
num
<=
0
)
{
cs
.
rowIndex
[
tableIndex
]
+=
1
;
cs
.
rowIndex
[
tableIndex
]
+=
1
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
847775c4
...
@@ -426,18 +426,10 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
...
@@ -426,18 +426,10 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
newCapacity
+=
4
;
newCapacity
+=
4
;
}
}
char
*
t
=
taosMemoryRealloc
(
pResultRowInfo
->
pResult
,
(
size_t
)(
newCapacity
*
POINTER_BYTES
));
if
(
t
==
NULL
)
{
longjmp
(
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pResultRowInfo
->
pPosition
=
taosMemoryRealloc
(
pResultRowInfo
->
pPosition
,
newCapacity
*
sizeof
(
SResultRowPosition
));
pResultRowInfo
->
pPosition
=
taosMemoryRealloc
(
pResultRowInfo
->
pPosition
,
newCapacity
*
sizeof
(
SResultRowPosition
));
pResultRowInfo
->
pResult
=
(
SResultRow
**
)
t
;
int32_t
inc
=
(
int32_t
)
newCapacity
-
pResultRowInfo
->
capacity
;
int32_t
inc
=
(
int32_t
)
newCapacity
-
pResultRowInfo
->
capacity
;
memset
(
&
pResultRowInfo
->
pResult
[
pResultRowInfo
->
capacity
],
0
,
POINTER_BYTES
*
inc
);
memset
(
&
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
capacity
],
0
,
sizeof
(
SResultRowPosition
));
memset
(
&
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
capacity
],
0
,
sizeof
(
SResultRowPosition
));
pResultRowInfo
->
capacity
=
(
int32_t
)
newCapacity
;
pResultRowInfo
->
capacity
=
(
int32_t
)
newCapacity
;
}
}
...
@@ -458,9 +450,8 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
...
@@ -458,9 +450,8 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
if
(
p1
!=
NULL
)
{
if
(
p1
!=
NULL
)
{
if
(
pResultRowInfo
->
size
==
0
)
{
if
(
pResultRowInfo
->
size
==
0
)
{
existed
=
false
;
existed
=
false
;
assert
(
pResultRowInfo
->
curPos
==
-
1
);
}
else
if
(
pResultRowInfo
->
size
==
1
)
{
}
else
if
(
pResultRowInfo
->
size
==
1
)
{
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
));
//
existed = (pResultRowInfo->pResult[0] == (*p1));
}
else
{
// check if current pResultRowInfo contains the existed pResultRow
}
else
{
// check if current pResultRowInfo contains the existed pResultRow
SET_RES_EXT_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
pData
,
bytes
,
uid
,
pResultRowInfo
);
SET_RES_EXT_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
pData
,
bytes
,
uid
,
pResultRowInfo
);
int64_t
*
index
=
int64_t
*
index
=
...
@@ -479,6 +470,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
...
@@ -479,6 +470,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
return
p1
!=
NULL
;
return
p1
!=
NULL
;
}
}
#if 0
static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
bool existed = false;
bool existed = false;
...
@@ -496,16 +488,16 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
...
@@ -496,16 +488,16 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
if (p1 != NULL) {
if (p1 != NULL) {
if (pResultRowInfo->size == 0) {
if (pResultRowInfo->size == 0) {
existed = false;
existed = false;
assert
(
pResultRowInfo
->
curPos
==
-
1
);
//
assert(pResultRowInfo->curPos == -1);
} else if (pResultRowInfo->size == 1) {
} else if (pResultRowInfo->size == 1) {
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
));
//
existed = (pResultRowInfo->pResult[0] == (*p1));
pResultRowInfo
->
curPos
=
0
;
//
pResultRowInfo->curPos = 0;
} else { // check if current pResultRowInfo contains the existed pResultRow
} else { // check if current pResultRowInfo contains the existed pResultRow
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid, pResultRowInfo);
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid, pResultRowInfo);
int64_t* index =
int64_t* index =
taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
if (index != NULL) {
if (index != NULL) {
pResultRowInfo
->
curPos
=
(
int32_t
)
*
index
;
//
pResultRowInfo->curPos = (int32_t)*index;
existed = true;
existed = true;
} else {
} else {
existed = false;
existed = false;
...
@@ -555,6 +547,7 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
...
@@ -555,6 +547,7 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
return pResultRowInfo->pResult[pResultRowInfo->curPos];
return pResultRowInfo->pResult[pResultRowInfo->curPos];
}
}
#endif
SResultRow
*
getNewResultRow_rv
(
SDiskbasedBuf
*
pResultBuf
,
int64_t
tableGroupId
,
int32_t
interBufSize
)
{
SResultRow
*
getNewResultRow_rv
(
SDiskbasedBuf
*
pResultBuf
,
int64_t
tableGroupId
,
int32_t
interBufSize
)
{
SFilePage
*
pData
=
NULL
;
SFilePage
*
pData
=
NULL
;
...
@@ -599,65 +592,75 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
...
@@ -599,65 +592,75 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
static
SResultRow
*
doSetResultOutBufByKey_rv
(
SDiskbasedBuf
*
pResultBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
tid
,
static
SResultRow
*
doSetResultOutBufByKey_rv
(
SDiskbasedBuf
*
pResultBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
tid
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
tableGroupId
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
tableGroupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
)
{
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
)
{
bool
exist
ed
=
false
;
bool
exist
InCurrentResusltRowInfo
=
false
;
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tableGroupId
);
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tableGroupId
);
SResultRow
**
p1
=
(
SResultRow
*
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
SResultRow
Position
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
// in case of repeat scan/reverse scan, no new time window added.
// in case of repeat scan/reverse scan, no new time window added.
if
(
isIntervalQuery
)
{
if
(
isIntervalQuery
)
{
if
(
!
masterscan
)
{
// the *p1 may be NULL in case of sliding+offset exists.
if
(
!
masterscan
)
{
// the *p1 may be NULL in case of sliding+offset exists.
return
(
p1
!=
NULL
)
?
*
p1
:
NULL
;
if
(
p1
!=
NULL
)
{
return
getResultRowByPos
(
pResultBuf
,
p1
);
}
else
{
return
NULL
;
}
}
}
if
(
p1
!=
NULL
)
{
if
(
p1
!=
NULL
)
{
if
(
pResultRowInfo
->
size
==
0
)
{
if
(
pResultRowInfo
->
size
==
0
)
{
exist
ed
=
false
;
exist
InCurrentResusltRowInfo
=
false
;
// this time window created by other timestamp that does not belongs to current table.
assert
(
pResultRowInfo
->
curPos
==
-
1
);
assert
(
pResultRowInfo
->
curPos
==
-
1
);
}
else
if
(
pResultRowInfo
->
size
==
1
)
{
}
else
if
(
pResultRowInfo
->
size
==
1
)
{
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
)
);
ASSERT
(
0
);
pResultRowInfo
->
curPos
=
0
;
// existInCurrentResusltRowInfo = (pResultRowInfo->pResult[0] == (*p1))
;
}
else
{
// check if current pResultRowInfo contains the exist
ed
pResultRow
}
else
{
// check if current pResultRowInfo contains the exist
InCurrentResusltRowInfo
pResultRow
SET_RES_EXT_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
SET_RES_EXT_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
int64_t
*
index
=
taosHashGet
(
pSup
->
pResultRowListSet
,
pSup
->
keyBuf
,
GET_RES_EXT_WINDOW_KEY_LEN
(
bytes
));
int64_t
*
index
=
taosHashGet
(
pSup
->
pResultRowListSet
,
pSup
->
keyBuf
,
GET_RES_EXT_WINDOW_KEY_LEN
(
bytes
));
if
(
index
!=
NULL
)
{
if
(
index
!=
NULL
)
{
pResultRowInfo
->
curPos
=
(
int32_t
)
*
index
;
// TODO check the scan order for current opened time window
existed
=
true
;
// pResultRowInfo->curPos = (int32_t)*index;
existInCurrentResusltRowInfo
=
true
;
}
else
{
}
else
{
exist
ed
=
false
;
exist
InCurrentResusltRowInfo
=
false
;
}
}
}
}
}
}
}
else
{
}
else
{
// In case of group by column query, the required SResultRow object must be exist
ed
in the pResultRowInfo object.
// In case of group by column query, the required SResultRow object must be exist
InCurrentResusltRowInfo
in the pResultRowInfo object.
if
(
p1
!=
NULL
)
{
if
(
p1
!=
NULL
)
{
return
*
p1
;
return
getResultRowByPos
(
pResultBuf
,
p1
)
;
}
}
}
}
if
(
!
existed
)
{
SResultRow
*
pResult
=
NULL
;
prepareResultListBuffer
(
pResultRowInfo
,
pTaskInfo
->
env
);
if
(
!
existInCurrentResusltRowInfo
)
{
// 1. close current opened time window
if
(
pResultRowInfo
->
curPos
!=
-
1
)
{
// todo extract function
SResultRowPosition
*
pos
=
&
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
curPos
];
SFilePage
*
pPage
=
getBufPage
(
pResultBuf
,
pos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
pPage
+
pos
->
offset
);
closeResultRow
(
pRow
);
releaseBufPage
(
pResultBuf
,
pPage
);
}
SResultRow
*
pResult
=
NULL
;
prepareResultListBuffer
(
pResultRowInfo
,
pTaskInfo
->
env
)
;
if
(
p1
==
NULL
)
{
if
(
p1
==
NULL
)
{
pResult
=
getNewResultRow_rv
(
pResultBuf
,
tableGroupId
,
pSup
->
resultRowSize
);
pResult
=
getNewResultRow_rv
(
pResultBuf
,
tableGroupId
,
pSup
->
resultRowSize
);
int32_t
ret
=
initResultRow
(
pResult
);
initResultRow
(
pResult
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
// add a new result set for a new group
// add a new result set for a new group
taosHashPut
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pResult
,
POINTER_BYTES
);
SResultRowPosition
pos
=
{.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
SResultRowCell
cell
=
{.
groupId
=
tableGroupId
,
.
pRow
=
pResult
};
taosHashPut
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pos
,
POINTER_BYTES
);
SResultRowCell
cell
=
{.
groupId
=
tableGroupId
,
.
pos
=
pos
};
taosArrayPush
(
pSup
->
pResultRowArrayList
,
&
cell
);
taosArrayPush
(
pSup
->
pResultRowArrayList
,
&
cell
);
}
else
{
}
else
{
pResult
=
*
p1
;
pResult
=
getResultRowByPos
(
pResultBuf
,
p1
)
;
}
}
// 2. set the new time window to be the new active time window
pResultRowInfo
->
curPos
=
pResultRowInfo
->
size
;
pResultRowInfo
->
curPos
=
pResultRowInfo
->
size
;
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
size
]
=
pResultRowInfo
->
pPosition
[
pResultRowInfo
->
size
++
]
=
(
SResultRowPosition
){.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
(
SResultRowPosition
){.
pageId
=
pResult
->
pageId
,
.
offset
=
pResult
->
offset
};
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
++
]
=
pResult
;
int64_t
index
=
pResultRowInfo
->
curPos
;
int64_t
index
=
pResultRowInfo
->
curPos
;
SET_RES_EXT_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
SET_RES_EXT_WINDOW_KEY
(
pSup
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
...
@@ -669,7 +672,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
...
@@ -669,7 +672,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
}
}
return
pResult
RowInfo
->
pResult
[
pResultRowInfo
->
curPos
]
;
return
pResult
;
}
}
static
void
getInitialStartTimeWindow
(
SInterval
*
pInterval
,
int32_t
precision
,
TSKEY
ts
,
STimeWindow
*
w
,
TSKEY
ekey
,
static
void
getInitialStartTimeWindow
(
SInterval
*
pInterval
,
int32_t
precision
,
TSKEY
ts
,
STimeWindow
*
w
,
TSKEY
ekey
,
...
@@ -693,7 +696,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T
...
@@ -693,7 +696,7 @@ static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, T
}
}
// get the correct time window according to the handled timestamp
// get the correct time window according to the handled timestamp
static
STimeWindow
getActiveTimeWindow
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
static
STimeWindow
getActiveTimeWindow
(
S
DiskbasedBuf
*
pBuf
,
S
ResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
precision
,
STimeWindow
*
win
)
{
int32_t
precision
,
STimeWindow
*
win
)
{
STimeWindow
w
=
{
0
};
STimeWindow
w
=
{
0
};
...
@@ -701,7 +704,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo* pResultRowInfo, int64_t t
...
@@ -701,7 +704,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo* pResultRowInfo, int64_t t
getInitialStartTimeWindow
(
pInterval
,
precision
,
ts
,
&
w
,
win
->
ekey
,
true
);
getInitialStartTimeWindow
(
pInterval
,
precision
,
ts
,
&
w
,
win
->
ekey
,
true
);
w
.
ekey
=
taosTimeAdd
(
w
.
skey
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
precision
)
-
1
;
w
.
ekey
=
taosTimeAdd
(
w
.
skey
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
precision
)
-
1
;
}
else
{
}
else
{
w
=
getResultRow
(
pResultRowInfo
,
pResultRowInfo
->
curPos
)
->
win
;
w
=
getResultRow
(
p
Buf
,
p
ResultRowInfo
,
pResultRowInfo
->
curPos
)
->
win
;
}
}
if
(
w
.
skey
>
ts
||
w
.
ekey
<
ts
)
{
if
(
w
.
skey
>
ts
||
w
.
ekey
<
ts
)
{
...
@@ -730,7 +733,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo* pResultRowInfo, int64_t t
...
@@ -730,7 +733,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo* pResultRowInfo, int64_t t
// get the correct time window according to the handled timestamp
// get the correct time window according to the handled timestamp
static
STimeWindow
getCurrentActiveTimeWindow
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
STaskAttr
*
pQueryAttr
)
{
static
STimeWindow
getCurrentActiveTimeWindow
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
STaskAttr
*
pQueryAttr
)
{
STimeWindow
w
=
{
0
};
STimeWindow
w
=
{
0
};
#if 0
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
// getInitialStartTimeWindow(pQueryAttr, ts, &w);
// getInitialStartTimeWindow(pQueryAttr, ts, &w);
...
@@ -742,7 +745,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo* pResultRowInfo, in
...
@@ -742,7 +745,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo* pResultRowInfo, in
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
}
}
} else {
} else {
w
=
getResultRow
(
pResultRowInfo
,
pResultRowInfo
->
curPos
)
->
win
;
w =
pRow
->win;
}
}
/*
/*
...
@@ -752,6 +755,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo* pResultRowInfo, in
...
@@ -752,6 +755,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo* pResultRowInfo, in
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) {
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) {
w.ekey = pQueryAttr->window.ekey;
w.ekey = pQueryAttr->window.ekey;
}
}
#endif
return
w
;
return
w
;
}
}
...
@@ -816,8 +820,8 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowI
...
@@ -816,8 +820,8 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowI
assert
(
win
->
skey
<=
win
->
ekey
);
assert
(
win
->
skey
<=
win
->
ekey
);
SDiskbasedBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
SDiskbasedBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
SResultRow
*
pResultRow
=
doSetResultOutBufByKey
(
pRuntimeEnv
,
pResultRowInfo
,
tid
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
SResultRow
*
pResultRow
=
NULL
;
//
doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&win->skey, TSDB_KEYSIZE,
masterscan
,
tableGroupId
);
//
masterscan, tableGroupId);
if
(
pResultRow
==
NULL
)
{
if
(
pResultRow
==
NULL
)
{
*
pResult
=
NULL
;
*
pResult
=
NULL
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -909,9 +913,9 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
...
@@ -909,9 +913,9 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return
forwardStep
;
return
forwardStep
;
}
}
static
void
doUpdateResultRowIndex
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
bool
ascQuery
,
static
void
doUpdateResultRowIndex
(
SResultRowInfo
*
pResultRowInfo
,
TSKEY
lastKey
,
bool
ascQuery
,
bool
timeWindowInterpo
)
{
bool
timeWindowInterpo
)
{
int64_t
skey
=
TSKEY_INITIAL_VAL
;
int64_t
skey
=
TSKEY_INITIAL_VAL
;
#if 0
int32_t i = 0;
int32_t i = 0;
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
SResultRow* pResult = pResultRowInfo->pResult[i];
...
@@ -963,6 +967,7 @@ static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey
...
@@ -963,6 +967,7 @@ static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey
pResultRowInfo->curPos = i + 1; // current not closed result object
pResultRowInfo->curPos = i + 1; // current not closed result object
}
}
}
}
#endif
}
}
static
void
updateResultRowInfoActiveIndex
(
SResultRowInfo
*
pResultRowInfo
,
const
STimeWindow
*
pWin
,
TSKEY
lastKey
,
static
void
updateResultRowInfoActiveIndex
(
SResultRowInfo
*
pResultRowInfo
,
const
STimeWindow
*
pWin
,
TSKEY
lastKey
,
...
@@ -1551,14 +1556,14 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
...
@@ -1551,14 +1556,14 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
if
(
pSDataBlock
->
pDataBlock
!=
NULL
)
{
if
(
pSDataBlock
->
pDataBlock
!=
NULL
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
0
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pSDataBlock
->
pDataBlock
,
0
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
// assert(tsCols[0] == pSDataBlock->info.window.skey &&
// assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] ==
//
tsCols[pSDataBlock->info.rows - 1] ==
pSDataBlock->info.window.ekey);
// pSDataBlock->info.window.ekey);
}
}
int32_t
startPos
=
ascScan
?
0
:
(
pSDataBlock
->
info
.
rows
-
1
);
int32_t
startPos
=
ascScan
?
0
:
(
pSDataBlock
->
info
.
rows
-
1
);
TSKEY
ts
=
getStartTsKey
(
&
pSDataBlock
->
info
.
window
,
tsCols
,
pSDataBlock
->
info
.
rows
,
ascScan
);
TSKEY
ts
=
getStartTsKey
(
&
pSDataBlock
->
info
.
window
,
tsCols
,
pSDataBlock
->
info
.
rows
,
ascScan
);
STimeWindow
win
=
getActiveTimeWindow
(
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
&
pInfo
->
win
);
STimeWindow
win
=
getActiveTimeWindow
(
p
Info
->
aggSup
.
pResultBuf
,
p
ResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
&
pInfo
->
win
);
bool
masterScan
=
true
;
bool
masterScan
=
true
;
SResultRow
*
pResult
=
NULL
;
SResultRow
*
pResult
=
NULL
;
...
@@ -1581,6 +1586,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
...
@@ -1581,6 +1586,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// prev time window not interpolation yet.
// prev time window not interpolation yet.
int32_t
curIndex
=
pResultRowInfo
->
curPos
;
int32_t
curIndex
=
pResultRowInfo
->
curPos
;
#if 0
if (prevIndex != -1 && prevIndex < curIndex && pInfo->timeWindowInterpo) {
if (prevIndex != -1 && prevIndex < curIndex && pInfo->timeWindowInterpo) {
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow* pRes = getResultRow(pResultRowInfo, j);
SResultRow* pRes = getResultRow(pResultRowInfo, j);
...
@@ -1615,6 +1622,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
...
@@ -1615,6 +1622,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
}
}
#endif
// window start key interpolation
// window start key interpolation
doWindowBorderInterpolation
(
pOperatorInfo
,
pSDataBlock
,
pInfo
->
binfo
.
pCtx
,
pResult
,
&
win
,
startPos
,
forwardStep
,
doWindowBorderInterpolation
(
pOperatorInfo
,
pSDataBlock
,
pInfo
->
binfo
.
pCtx
,
pResult
,
&
win
,
startPos
,
forwardStep
,
...
@@ -3430,10 +3438,8 @@ void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
...
@@ -3430,10 +3438,8 @@ void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
}
}
}
}
// TODO fix this bug.
void
initResultRow
(
SResultRow
*
pResultRow
)
{
int32_t
initResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
pEntryInfo
=
(
struct
SResultRowEntryInfo
*
)((
char
*
)
pResultRow
+
sizeof
(
SResultRow
));
pResultRow
->
pEntryInfo
=
(
struct
SResultRowEntryInfo
*
)((
char
*
)
pResultRow
+
sizeof
(
SResultRow
));
return
TSDB_CODE_SUCCESS
;
}
}
/*
/*
...
@@ -3610,9 +3616,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
...
@@ -3610,9 +3616,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
SFilePage
*
bufPage
=
getBufPage
(
pBuf
,
pPos
->
pageId
);
SFilePage
*
bufPage
=
getBufPage
(
pBuf
,
pPos
->
pageId
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pPos
->
offset
);
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pPos
->
offset
);
if
(
!
isResultRowClosed
(
pResultRowInfo
,
i
))
{
continue
;
// TODO ignore the close status anyway.
}
// if (!isResultRowClosed(pRow)) {
// continue;
// }
for
(
int32_t
j
=
0
;
j
<
numOfOutput
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
numOfOutput
;
++
j
)
{
pCtx
[
j
].
resultInfo
=
getResultCell
(
pRow
,
j
,
rowCellInfoOffset
);
pCtx
[
j
].
resultInfo
=
getResultCell
(
pRow
,
j
,
rowCellInfoOffset
);
...
@@ -3622,7 +3630,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
...
@@ -3622,7 +3630,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
continue
;
continue
;
}
}
if
(
pCtx
[
j
].
fpSet
.
process
)
{
// TODO set the dummy function.
if
(
pCtx
[
j
].
fpSet
.
process
)
{
// TODO set the dummy function
, to avoid the check for null ptr
.
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
]);
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
]);
}
}
...
@@ -4132,7 +4140,7 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu
...
@@ -4132,7 +4140,7 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
// return;
// return;
// }
// }
#if 0
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
SResultRow* pResult = pResultRowInfo->pResult[i];
...
@@ -4146,6 +4154,8 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu
...
@@ -4146,6 +4154,8 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu
pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
}
}
}
}
#endif
}
}
static
int32_t
compressQueryColData
(
SColumnInfoData
*
pColRes
,
int32_t
numOfRows
,
char
*
data
,
int8_t
compressed
)
{
static
int32_t
compressQueryColData
(
SColumnInfoData
*
pColRes
,
int32_t
numOfRows
,
char
*
data
,
int8_t
compressed
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录