Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b95e95dc
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b95e95dc
编写于
2月 22, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-13039] fix bug in sorted merge operator.
上级
6c1f75fe
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
572 addition
and
398 deletion
+572
-398
include/common/tep.h
include/common/tep.h
+6
-6
include/libs/function/function.h
include/libs/function/function.h
+3
-1
source/common/src/tep.c
source/common/src/tep.c
+35
-14
source/common/test/commonTests.cpp
source/common/test/commonTests.cpp
+1
-1
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+4
-4
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+18
-15
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+4
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+362
-206
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+22
-38
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+28
-23
source/libs/executor/test/executorUtilTests.cpp
source/libs/executor/test/executorUtilTests.cpp
+85
-83
source/libs/function/src/taggfunction.c
source/libs/function/src/taggfunction.c
+3
-2
source/libs/parser/src/queryInfoUtil.c
source/libs/parser/src/queryInfoUtil.c
+1
-1
未找到文件。
include/common/tep.h
浏览文件 @
b95e95dc
...
...
@@ -64,16 +64,16 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
}
}
#define colDataGet(p1_, r_) \
#define colDataGet
Data
(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: (p1_)->pData + ((r_) * (p1_)->info.bytes))
;
: (p1_)->pData + ((r_) * (p1_)->info.bytes))
int32_t
colDataAppend
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
);
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
numOfRow1
,
const
SColumnInfoData
*
pSource
,
uint32_t
numOfRow2
);
int32_t
blockDataUpdateTsWindow
(
SSDataBlock
*
pDataBlock
);
int32_t
colDataGet
Size
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
);
void
colDataTrim
(
SColumnInfoData
*
pColumnInfoData
);
int32_t
colDataGet
Length
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
);
void
colDataTrim
(
SColumnInfoData
*
pColumnInfoData
);
size_t
colDataGetNumOfCols
(
const
SSDataBlock
*
pBlock
);
size_t
colDataGetNumOfRows
(
const
SSDataBlock
*
pBlock
);
...
...
@@ -92,13 +92,13 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
SSchema
*
blockDataExtractSchema
(
const
SSDataBlock
*
pBlock
,
int32_t
*
numOfCols
);
size_t
blockDataNumOfRowsForSerialize
(
const
SSDataBlock
*
pBlock
,
int32_t
blockSize
);
int32_t
blockDataSort
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
);
int32_t
blockDataSort_rv
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
);
int32_t
blockDataEnsureCapacity
(
SSDataBlock
*
pDataBlock
,
uint32_t
numOfRows
);
void
blockDataClearup
(
SSDataBlock
*
pDataBlock
,
bool
hasVarCol
);
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
);
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
);
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
#ifdef __cplusplus
...
...
include/libs/function/function.h
浏览文件 @
b95e95dc
...
...
@@ -138,8 +138,10 @@ extern SFunctionFpSet fpSet[1];
// sql function runtime context
typedef
struct
SqlFunctionCtx
{
int32_t
startRow
;
int32_t
size
;
// number of rows
void
*
pInput
;
// input data buffer
SColumnInfoData
*
pInput
;
uint32_t
order
;
// asc|desc
int16_t
inputType
;
int16_t
inputBytes
;
...
...
source/common/src/tep.c
浏览文件 @
b95e95dc
...
...
@@ -63,7 +63,7 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) {
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
int32_t
colDataGet
Size
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
)
{
int32_t
colDataGet
Length
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
)
{
ASSERT
(
pColumnInfoData
!=
NULL
);
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
return
pColumnInfoData
->
varmeta
.
length
;
...
...
@@ -249,8 +249,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
}
ASSERT
(
pColInfoData
->
nullbitmap
==
NULL
);
pDataBlock
->
info
.
window
.
skey
=
*
(
TSKEY
*
)
colDataGet
(
pColInfoData
,
0
);
pDataBlock
->
info
.
window
.
ekey
=
*
(
TSKEY
*
)
colDataGet
(
pColInfoData
,
(
pDataBlock
->
info
.
rows
-
1
));
pDataBlock
->
info
.
window
.
skey
=
*
(
TSKEY
*
)
colDataGet
Data
(
pColInfoData
,
0
);
pDataBlock
->
info
.
window
.
ekey
=
*
(
TSKEY
*
)
colDataGet
Data
(
pColInfoData
,
(
pDataBlock
->
info
.
rows
-
1
));
return
0
;
}
...
...
@@ -262,8 +262,8 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
SColumnInfoData
*
pCol2
=
taosArrayGet
(
pDest
->
pDataBlock
,
i
);
SColumnInfoData
*
pCol1
=
taosArrayGet
(
pSrc
->
pDataBlock
,
i
);
uint32_t
oldLen
=
colDataGet
Size
(
pCol2
,
pDest
->
info
.
rows
);
uint32_t
newLen
=
colDataGet
Size
(
pCol1
,
pSrc
->
info
.
rows
);
uint32_t
oldLen
=
colDataGet
Length
(
pCol2
,
pDest
->
info
.
rows
);
uint32_t
newLen
=
colDataGet
Length
(
pCol1
,
pSrc
->
info
.
rows
);
int32_t
newSize
=
oldLen
+
newLen
;
char
*
tmp
=
realloc
(
pCol2
->
pData
,
newSize
);
...
...
@@ -287,7 +287,7 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) {
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
total
+=
colDataGet
Size
(
pColInfoData
,
pBlock
->
info
.
rows
);
total
+=
colDataGet
Length
(
pColInfoData
,
pBlock
->
info
.
rows
);
if
(
IS_VAR_DATA_TYPE
(
pColInfoData
->
info
.
type
))
{
total
+=
sizeof
(
int32_t
)
*
pBlock
->
info
.
rows
;
...
...
@@ -336,7 +336,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
if
(
isNull
)
{
// do nothing
}
else
{
char
*
p
=
colDataGet
(
pColInfoData
,
j
);
char
*
p
=
colDataGet
Data
(
pColInfoData
,
j
);
size
+=
varDataTLen
(
p
);
}
...
...
@@ -401,7 +401,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
for
(
int32_t
j
=
startIndex
;
j
<
(
startIndex
+
rowCount
);
++
j
)
{
bool
isNull
=
colDataIsNull
(
pColData
,
pBlock
->
info
.
rows
,
j
,
pBlock
->
pBlockAgg
);
char
*
p
=
colDataGet
(
pColData
,
j
);
char
*
p
=
colDataGet
Data
(
pColData
,
j
);
colDataAppend
(
pDstCol
,
j
-
startIndex
,
p
,
isNull
);
}
...
...
@@ -443,7 +443,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
pStart
+=
BitmapLen
(
pBlock
->
info
.
rows
);
}
uint32_t
dataSize
=
colDataGet
Size
(
pCol
,
numOfRows
);
uint32_t
dataSize
=
colDataGet
Length
(
pCol
,
numOfRows
);
*
(
int32_t
*
)
pStart
=
dataSize
;
pStart
+=
sizeof
(
int32_t
);
...
...
@@ -592,8 +592,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
}
}
void
*
left1
=
colDataGet
(
pColInfoData
,
left
);
void
*
right1
=
colDataGet
(
pColInfoData
,
right
);
void
*
left1
=
colDataGet
Data
(
pColInfoData
,
left
);
void
*
right1
=
colDataGet
Data
(
pColInfoData
,
right
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
...
...
@@ -632,7 +632,7 @@ static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, co
return
code
;
}
}
else
{
char
*
p
=
colDataGet
(
pSrc
,
tupleIndex
);
char
*
p
=
colDataGet
Data
(
pSrc
,
tupleIndex
);
code
=
colDataAppend
(
pDst
,
numOfRows
,
p
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -971,8 +971,8 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
// }
// }
// void* left1 = colDataGet(pColInfoData, left);
// void* right1 = colDataGet(pColInfoData, right);
// void* left1 = colDataGet
Data
(pColInfoData, left);
// void* right1 = colDataGet
Data
(pColInfoData, right);
// switch(pColInfoData->info.type) {
// case TSDB_DATA_TYPE_INT: {
...
...
@@ -1113,4 +1113,25 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
tfree
(
pBlock
->
pBlockAgg
);
tfree
(
pBlock
);
return
NULL
;
}
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
)
{
int32_t
numOfCols
=
pDataBlock
->
info
.
numOfCols
;
SSDataBlock
*
pBlock
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
pBlock
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
pBlock
->
info
.
numOfCols
=
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{
0
};
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
colInfo
.
info
=
p
->
info
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfo
);
}
return
pBlock
;
}
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
)
{
return
pageSize
/
(
blockDataGetSerialRowSize
(
pBlock
)
+
blockDataGetSerialMetaSize
(
pBlock
));
}
\ No newline at end of file
source/common/test/commonTests.cpp
浏览文件 @
b95e95dc
...
...
@@ -162,7 +162,7 @@ TEST(testCase, Datablock_test) {
ASSERT_EQ
(
colDataGetNumOfCols
(
b
),
2
);
ASSERT_EQ
(
colDataGetNumOfRows
(
b
),
40
);
char
*
pData
=
colDataGet
(
p1
,
3
);
char
*
pData
=
colDataGet
Data
(
p1
,
3
);
printf
(
"the second row of binary:%s, length:%d
\n
"
,
(
char
*
)
varDataVal
(
pData
),
varDataLen
(
pData
));
SArray
*
pOrderInfo
=
taosArrayInit
(
3
,
sizeof
(
SBlockOrderInfo
));
...
...
source/libs/executor/inc/executil.h
浏览文件 @
b95e95dc
...
...
@@ -69,8 +69,8 @@ typedef struct SResultRow {
typedef
struct
SResultRowInfo
{
SResultRow
**
pResult
;
// result list
int16_t
type
:
8
;
// data type for hash key
int32_t
size
:
24
;
// number of result set
//
int16_t type:8; // data type for hash key
int32_t
size
;
// number of result set
int32_t
capacity
;
// max capacity
int32_t
curPos
;
// current active result row index of pResult list
}
SResultRowInfo
;
...
...
@@ -95,7 +95,7 @@ struct SUdfInfo;
int32_t
getOutputInterResultBufSize
(
struct
STaskAttr
*
pQueryAttr
);
size_t
getResultRowSize
(
SArray
*
pExprInfo
);
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
);
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
resetResultRowInfo
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
...
...
@@ -105,7 +105,7 @@ void closeAllResultRows(SResultRowInfo* pResultRowInfo);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
clearResultRow
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
);
void
clearResultRow
(
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
b95e95dc
...
...
@@ -445,16 +445,20 @@ typedef struct SOptrBasicInfo {
int32_t
capacity
;
}
SOptrBasicInfo
;
typedef
struct
SOptrBasicInfo
STableIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
typedef
struct
SAggSupporter
{
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
SHashObj
*
pResultRowListSet
;
// used to check if current ResultRowInfo has ResultRow object or not
SArray
*
pResultRowArrayList
;
// The array list that contains the Result rows
char
*
keyBuf
;
// window key buffer
SResultRowPool
*
pool
;
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
}
SAggSupporter
;
typedef
struct
SOptrBasicInfo
STableIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SAggSupporter
aggSup
;
STableQueryInfo
*
current
;
uint32_t
groupId
;
SGroupResInfo
groupResInfo
;
...
...
@@ -552,8 +556,6 @@ typedef struct SDistinctOperatorInfo {
typedef
struct
SSortedMergeOperatorInfo
{
SOptrBasicInfo
binfo
;
// SSDataBlock *pDataBlock;
bool
hasVarCol
;
SArray
*
orderInfo
;
// SArray<SBlockOrderInfo>
...
...
@@ -564,12 +566,16 @@ typedef struct SSortedMergeOperatorInfo {
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
int32_t
numOfRowsInRes
;
char
**
prevRow
;
int32_t
resultRowFactor
;
bool
multiGroupResults
;
bool
hasGroupColData
;
bool
hasGroupVal
;
SDiskbasedBuf
*
pTupleStore
;
// keep the final results
int32_t
numOfResPerPage
;
char
**
groupVal
;
SArray
*
groupInfo
;
SAggSupporter
aggSup
;
}
SSortedMergeOperatorInfo
;
typedef
struct
SOrderOperatorInfo
{
...
...
@@ -634,7 +640,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo
*
createJoinOperatorInfo
(
SOperatorInfo
**
pdownstream
,
int32_t
numOfDownstream
,
SSchema
*
pSchema
,
int32_t
numOfOutput
);
SOperatorInfo
*
createOrderOperatorInfo
(
SOperatorInfo
*
downstream
,
SArray
*
pExprInfo
,
SArray
*
pOrderVal
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SArray
*
pExprInfo
,
SArray
*
pOrderVal
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SArray
*
pExprInfo
,
SArray
*
pOrderVal
,
S
Array
*
pGroupInfo
,
S
ExecTaskInfo
*
pTaskInfo
);
// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
...
...
@@ -682,9 +688,6 @@ int32_t checkForQueryBuf(size_t numOfTables);
bool
checkNeedToCompressQueryCol
(
SQInfo
*
pQInfo
);
void
setQueryStatus
(
STaskRuntimeEnv
*
pRuntimeEnv
,
int8_t
status
);
bool
onlyQueryTags
(
STaskAttr
*
pQueryAttr
);
// void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
,
int8_t
compressed
,
int32_t
*
compLen
);
size_t
getResultSize
(
SQInfo
*
pQInfo
,
int64_t
*
numOfRows
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
b95e95dc
...
...
@@ -53,8 +53,8 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
return
size
;
}
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
)
{
pResultRowInfo
->
type
=
type
;
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
)
{
//
pResultRowInfo->type = type;
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
curPos
=
-
1
;
pResultRowInfo
->
capacity
=
size
;
...
...
@@ -93,7 +93,7 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
for
(
int32_t
i
=
0
;
i
<
pResultRowInfo
->
size
;
++
i
)
{
SResultRow
*
pWindowRes
=
pResultRowInfo
->
pResult
[
i
];
clearResultRow
(
pRuntimeEnv
,
pWindowRes
,
pResultRowInfo
->
type
);
clearResultRow
(
pRuntimeEnv
,
pWindowRes
);
int32_t
groupIndex
=
0
;
int64_t
uid
=
0
;
...
...
@@ -136,7 +136,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
=
true
;
}
void
clearResultRow
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
)
{
void
clearResultRow
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
)
{
if
(
pResultRow
==
NULL
)
{
return
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
b95e95dc
...
...
@@ -12,14 +12,14 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tep.h>
#include <tsort.h>
#include "exception.h"
#include "os.h"
#include "tep.h"
#include "tsort.h"
#include "exception.h"
#include "parser.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tq.h"
#include "ttime.h"
#include "executorimpl.h"
...
...
@@ -381,12 +381,13 @@ static bool hasNull(SColumn* pColumn, SColumnDataAgg *pStatis) {
}
static
void
prepareResultListBuffer
(
SResultRowInfo
*
pResultRowInfo
,
jmp_buf
env
)
{
int64_t
newCapacity
=
0
;
// more than the capacity, reallocate the resources
if
(
pResultRowInfo
->
size
<
pResultRowInfo
->
capacity
)
{
return
;
}
int64_t
newCapacity
=
0
;
if
(
pResultRowInfo
->
capacity
>
10000
)
{
newCapacity
=
(
int64_t
)(
pResultRowInfo
->
capacity
*
1
.
25
);
}
else
{
...
...
@@ -519,12 +520,12 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
}
static
SResultRow
*
doSetResultOutBufByKey_rv
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
tid
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
tableGroupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAgg
OperatorInfo
*
pAggInfo
)
{
bool
masterscan
,
uint64_t
tableGroupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAgg
Supporter
*
pSup
)
{
bool
existed
=
false
;
SET_RES_WINDOW_KEY
(
p
AggInfo
->
keyBuf
,
pData
,
bytes
,
tableGroupId
);
SET_RES_WINDOW_KEY
(
p
Sup
->
keyBuf
,
pData
,
bytes
,
tableGroupId
);
SResultRow
**
p1
=
(
SResultRow
**
)
taosHashGet
(
p
AggInfo
->
pResultRowHashTable
,
pAggInfo
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
(
SResultRow
**
)
taosHashGet
(
p
Sup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
));
// in case of repeat scan/reverse scan, no new time window added.
if
(
isIntervalQuery
)
{
...
...
@@ -540,8 +541,8 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
));
pResultRowInfo
->
curPos
=
0
;
}
else
{
// check if current pResultRowInfo contains the existed pResultRow
SET_RES_EXT_WINDOW_KEY
(
p
AggInfo
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
int64_t
*
index
=
taosHashGet
(
p
AggInfo
->
pResultRowListSet
,
pAggInfo
->
keyBuf
,
GET_RES_EXT_WINDOW_KEY_LEN
(
bytes
));
SET_RES_EXT_WINDOW_KEY
(
p
Sup
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
int64_t
*
index
=
taosHashGet
(
p
Sup
->
pResultRowListSet
,
pSup
->
keyBuf
,
GET_RES_EXT_WINDOW_KEY_LEN
(
bytes
));
if
(
index
!=
NULL
)
{
pResultRowInfo
->
curPos
=
(
int32_t
)
*
index
;
existed
=
true
;
...
...
@@ -562,16 +563,16 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int
SResultRow
*
pResult
=
NULL
;
if
(
p1
==
NULL
)
{
pResult
=
getNewResultRow
(
p
AggInfo
->
pool
);
pResult
=
getNewResultRow
(
p
Sup
->
pool
);
int32_t
ret
=
initResultRow
(
pResult
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
// add a new result set for a new group
taosHashPut
(
p
AggInfo
->
pResultRowHashTable
,
pAggInfo
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pResult
,
POINTER_BYTES
);
taosHashPut
(
p
Sup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
bytes
),
&
pResult
,
POINTER_BYTES
);
SResultRowCell
cell
=
{.
groupId
=
tableGroupId
,
.
pRow
=
pResult
};
taosArrayPush
(
p
AggInfo
->
pResultRowArrayList
,
&
cell
);
taosArrayPush
(
p
Sup
->
pResultRowArrayList
,
&
cell
);
}
else
{
pResult
=
*
p1
;
}
...
...
@@ -580,8 +581,8 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
++
]
=
pResult
;
int64_t
index
=
pResultRowInfo
->
curPos
;
SET_RES_EXT_WINDOW_KEY
(
p
AggInfo
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
taosHashPut
(
p
AggInfo
->
pResultRowListSet
,
pAggInfo
->
keyBuf
,
GET_RES_EXT_WINDOW_KEY_LEN
(
bytes
),
&
index
,
POINTER_BYTES
);
SET_RES_EXT_WINDOW_KEY
(
p
Sup
->
keyBuf
,
pData
,
bytes
,
tid
,
pResultRowInfo
);
taosHashPut
(
p
Sup
->
pResultRowListSet
,
pSup
->
keyBuf
,
GET_RES_EXT_WINDOW_KEY_LEN
(
bytes
),
&
index
,
POINTER_BYTES
);
}
// too many time window in query
...
...
@@ -933,11 +934,11 @@ static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx,
pCtx
[
k
].
startTs
=
pWin
->
skey
;
// keep it temporarialy
char
*
start
=
pCtx
[
k
].
pInput
;
char
*
start
=
NULL
;
//
pCtx[k].pInput;
int32_t
pos
=
(
QUERY_IS_ASC_QUERY
(
pQueryAttr
))
?
offset
:
offset
-
(
forwardStep
-
1
);
if
(
pCtx
[
k
].
pInput
!=
NULL
)
{
pCtx
[
k
].
pInput
=
(
char
*
)
pCtx
[
k
].
pInput
+
pos
*
pCtx
[
k
].
inputBytes
;
//
pCtx[k].pInput = (char *)pCtx[k].pInput + pos * pCtx[k].inputBytes;
}
if
(
tsCol
!=
NULL
)
{
...
...
@@ -956,7 +957,7 @@ static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx,
// restore it
pCtx
[
k
].
isAggSet
=
hasAggregates
;
pCtx
[
k
].
pInput
=
start
;
//
pCtx[k].pInput = start;
}
}
...
...
@@ -1152,7 +1153,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
SColumnInfoData
*
p
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pCtx
[
i
].
columnIndex
);
// in case of the block distribution query, the inputBytes is not a constant value.
pCtx
[
i
].
pInput
=
p
->
pData
;
pCtx
[
i
].
pInput
=
p
;
assert
(
p
->
info
.
colId
==
pCol
->
info
.
colId
);
if
(
pCtx
[
i
].
functionId
<
0
)
{
...
...
@@ -1164,14 +1165,14 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
// uint32_t status = aAggs[pCtx[i].functionId].status;
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
SColumnInfoData
*
tsInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
//
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
// In case of the top/bottom query again the nest query result, which has no timestamp column
// don't set the ptsList attribute.
if
(
tsInfo
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
pCtx
[
i
].
ptsList
=
(
int64_t
*
)
tsInfo
->
pData
;
}
else
{
pCtx
[
i
].
ptsList
=
NULL
;
}
//
if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
//
pCtx[i].ptsList = (int64_t*) tsInfo->pData;
//
} else {
//
pCtx[i].ptsList = NULL;
//
}
// }
// } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
// SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
...
...
@@ -2341,29 +2342,6 @@ static bool isCachedLastQuery(STaskAttr *pQueryAttr) {
return
true
;
}
/**
* The following 4 kinds of query are treated as the tags query
* tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query
*/
bool
onlyQueryTags
(
STaskAttr
*
pQueryAttr
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pQueryAttr
->
pExpr1
[
i
];
int32_t
functionId
=
getExprFunctionId
(
pExprInfo
);
if
(
functionId
!=
FUNCTION_TAGPRJ
&&
functionId
!=
FUNCTION_TID_TAG
&&
(
!
(
functionId
==
FUNCTION_COUNT
&&
pExprInfo
->
base
.
pColumns
->
info
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
))
&&
(
!
(
functionId
==
FUNCTION_PRJ
&&
TSDB_COL_IS_UD_COL
(
pExprInfo
->
base
.
pColumns
->
flag
))))
{
return
false
;
}
}
return
true
;
}
/////////////////////////////////////////////////////////////////////////////////////////////
void
getAlignQueryTimeWindow
(
STaskAttr
*
pQueryAttr
,
int64_t
key
,
int64_t
keyFirst
,
int64_t
keyLast
,
STimeWindow
*
win
)
{
...
...
@@ -2864,8 +2842,6 @@ void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock,
tfree
(
p
);
}
static
SColumnInfo
*
doGetTagColumnInfoById
(
SColumnInfo
*
pTagColList
,
int32_t
numOfTags
,
int16_t
colId
);
static
void
doSetTagValueInParam
(
void
*
pTable
,
int32_t
tagColId
,
SVariant
*
tag
,
int16_t
type
,
int16_t
bytes
);
...
...
@@ -3380,10 +3356,8 @@ void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, in
initCtxOutputBuffer
(
pCtx
,
pDataBlock
->
info
.
numOfCols
);
}
void
setDefaultOutputBuf_rv
(
SAggOperatorInfo
*
pAggInfo
,
int32_t
stage
,
SExecTaskInfo
*
pTaskInfo
)
{
SOptrBasicInfo
*
pInfo
=
&
pAggInfo
->
binfo
;
// TODO refactor: some function move away
void
setDefaultOutputBuf_rv
(
SOptrBasicInfo
*
pInfo
,
SAggSupporter
*
pSup
,
int32_t
stage
,
SExecTaskInfo
*
pTaskInfo
)
{
SqlFunctionCtx
*
pCtx
=
pInfo
->
pCtx
;
SSDataBlock
*
pDataBlock
=
pInfo
->
pRes
;
int32_t
*
rowCellInfoOffset
=
pInfo
->
rowCellInfoOffset
;
...
...
@@ -3391,9 +3365,7 @@ void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int32_t stage, SExecTask
int64_t
tid
=
0
;
int64_t
groupId
=
0
;
pAggInfo
->
keyBuf
=
realloc
(
pAggInfo
->
keyBuf
,
sizeof
(
tid
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
);
SResultRow
*
pRow
=
doSetResultOutBufByKey_rv
(
pResultRowInfo
,
tid
,
(
char
*
)
&
tid
,
sizeof
(
tid
),
true
,
groupId
,
pTaskInfo
,
false
,
pAggInfo
);
SResultRow
*
pRow
=
doSetResultOutBufByKey_rv
(
pResultRowInfo
,
tid
,
(
char
*
)
&
tid
,
sizeof
(
tid
),
true
,
groupId
,
pTaskInfo
,
false
,
pSup
);
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
...
...
@@ -3606,7 +3578,7 @@ STableQueryInfo *createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow
// set more initial size of interval/groupby query
// if (/*QUERY_IS_INTERVAL_QUERY(pQueryAttr) || */groupbyColumn) {
int32_t
initialSize
=
128
;
int32_t
code
=
initResultRowInfo
(
&
pTableQueryInfo
->
resInfo
,
initialSize
,
TSDB_DATA_TYPE_INT
);
int32_t
code
=
initResultRowInfo
(
&
pTableQueryInfo
->
resInfo
,
initialSize
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
...
...
@@ -3624,7 +3596,7 @@ STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) {
// set more initial size of interval/groupby query
int32_t
initialSize
=
16
;
int32_t
code
=
initResultRowInfo
(
&
pTableQueryInfo
->
resInfo
,
initialSize
,
TSDB_DATA_TYPE_INT
);
int32_t
code
=
initResultRowInfo
(
&
pTableQueryInfo
->
resInfo
,
initialSize
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
pTableQueryInfo
);
return
NULL
;
...
...
@@ -3717,7 +3689,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i
int32_t
*
rowCellInfoOffset
=
pAggInfo
->
binfo
.
rowCellInfoOffset
;
SResultRow
*
pResultRow
=
doSetResultOutBufByKey_rv
(
pResultRowInfo
,
tid
,
(
char
*
)
&
tableGroupId
,
sizeof
(
tableGroupId
),
true
,
uid
,
pTaskInfo
,
false
,
pAggInfo
);
doSetResultOutBufByKey_rv
(
pResultRowInfo
,
tid
,
(
char
*
)
&
tableGroupId
,
sizeof
(
tableGroupId
),
true
,
uid
,
pTaskInfo
,
false
,
&
pAggInfo
->
aggSup
);
assert
(
pResultRow
!=
NULL
);
/*
...
...
@@ -4521,13 +4493,19 @@ void queryCostStatis(SExecTaskInfo *pTaskInfo) {
// return true;
//}
void
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
*
pDownstrea
m
)
{
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
nu
m
)
{
if
(
p
->
pDownstream
==
NULL
)
{
assert
(
p
->
numOfDownstream
==
0
);
}
p
->
pDownstream
=
realloc
(
p
->
pDownstream
,
POINTER_BYTES
*
(
p
->
numOfDownstream
+
1
));
p
->
pDownstream
[
p
->
numOfDownstream
++
]
=
pDownstream
;
p
->
pDownstream
=
calloc
(
1
,
num
*
POINTER_BYTES
);
if
(
p
->
pDownstream
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
memcpy
(
p
->
pDownstream
,
pDownstream
,
num
*
POINTER_BYTES
);
p
->
numOfDownstream
=
num
;
return
TSDB_CODE_SUCCESS
;
}
static
void
doDestroyTableQueryInfo
(
STableGroupInfo
*
pTableqinfoGroupInfo
);
...
...
@@ -5599,11 +5577,20 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
return
pOrderColumns
;
}
static
int32_t
initAggSup
(
SAggSupporter
*
pAggSup
,
SArray
*
pExprInfo
);
static
void
clearupAggSup
(
SAggSupporter
*
pAggSup
);
static
void
destroySortedMergeOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SSortedMergeOperatorInfo
*
pInfo
=
(
SSortedMergeOperatorInfo
*
)
param
;
taosArrayDestroy
(
pInfo
->
orderInfo
);
destroySortHandle
(
pInfo
->
pSortHandle
);
taosArrayDestroy
(
pInfo
->
groupInfo
);
if
(
pInfo
->
pSortHandle
!=
NULL
)
{
destroySortHandle
(
pInfo
->
pSortHandle
);
}
blockDataDestroy
(
pInfo
->
binfo
.
pRes
);
clearupAggSup
(
&
pInfo
->
aggSup
);
}
static
void
destroySlimitOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
...
...
@@ -5613,11 +5600,12 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
tfree
(
pInfo
->
prevRow
);
}
static
SExprInfo
*
exprArrayDup
(
SArray
*
pExprInfo
)
{
size_t
numOfOutput
=
taosArrayGetSize
(
pExprInfo
);
static
SExprInfo
*
exprArrayDup
(
SArray
*
pExprList
)
{
size_t
numOfOutput
=
taosArrayGetSize
(
pExprList
);
SExprInfo
*
p
=
calloc
(
numOfOutput
,
sizeof
(
SExprInfo
));
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pExprInfo
)
;
++
i
)
{
SExprInfo
*
pExpr
=
taosArrayGetP
(
pExpr
Info
,
i
);
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SExprInfo
*
pExpr
=
taosArrayGetP
(
pExpr
List
,
i
);
assignExprInfo
(
&
p
[
i
],
pExpr
);
}
...
...
@@ -5666,6 +5654,173 @@ SSDataBlock* loadNextDataBlock(void* param) {
return
pOperator
->
exec
(
pOperator
,
&
newgroup
);
}
static
bool
needToMerge
(
SSDataBlock
*
pBlock
,
SArray
*
groupInfo
,
char
**
buf
,
int32_t
rowIndex
)
{
size_t
size
=
taosArrayGetSize
(
groupInfo
);
if
(
size
==
0
)
{
return
true
;
}
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
int32_t
*
index
=
taosArrayGet
(
groupInfo
,
i
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
*
index
);
bool
isNull
=
colDataIsNull
(
pColInfo
,
rowIndex
,
pBlock
->
info
.
rows
,
NULL
);
if
((
isNull
&&
buf
[
i
]
!=
NULL
)
||
(
!
isNull
&&
buf
[
i
]
==
NULL
))
{
return
false
;
}
char
*
pCell
=
colDataGetData
(
pColInfo
,
rowIndex
);
if
(
IS_VAR_DATA_TYPE
(
pColInfo
->
info
.
type
))
{
if
(
varDataLen
(
pCell
)
!=
varDataLen
(
buf
[
i
]))
{
return
false
;
}
else
{
if
(
memcmp
(
varDataVal
(
pCell
),
varDataVal
(
buf
[
i
]),
varDataLen
(
pCell
))
!=
0
)
{
return
false
;
}
}
}
else
{
if
(
memcmp
(
pCell
,
buf
[
i
],
pColInfo
->
info
.
bytes
)
!=
0
)
{
return
false
;
}
}
}
return
0
;
}
static
void
doMergeResultImpl
(
SSortedMergeOperatorInfo
*
pInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
,
int32_t
rowIndex
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
// TODO set row index
pCtx
[
j
].
startRow
=
rowIndex
;
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
pCtx
[
j
].
fpSet
->
addInput
(
&
pCtx
[
j
]);
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
// doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
// } else {
// assert(!TSDB_FUNC_IS_SCALAR(functionId));
// aAggs[functionId].mergeFunc(&pCtx[j]);
// }
}
}
static
void
doFinalizeResultImpl
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
// if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
// continue;
// }
// if (functionId < 0) {
// SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
// doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
// } else {
pCtx
[
j
].
fpSet
->
addInput
(
&
pCtx
[
j
]);
}
}
static
bool
saveCurrentTuple
(
char
**
rowColData
,
SArray
*
pColumnList
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pColumnList
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
int32_t
*
index
=
taosArrayGet
(
pColumnList
,
i
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
*
index
);
char
*
data
=
colDataGetData
(
pColInfo
,
rowIndex
);
memcpy
(
rowColData
[
i
],
data
,
colDataGetLength
(
pColInfo
,
rowIndex
));
}
return
true
;
}
static
void
doMergeImpl
(
SOperatorInfo
*
pOperator
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
)
{
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
SqlFunctionCtx
*
pCtx
=
pInfo
->
binfo
.
pCtx
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
pCtx
[
i
].
size
=
1
;
}
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
if
(
!
pInfo
->
hasGroupVal
)
{
ASSERT
(
i
==
0
);
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
);
pInfo
->
hasGroupVal
=
saveCurrentTuple
(
pInfo
->
groupVal
,
pInfo
->
groupInfo
,
pBlock
,
i
);
}
else
{
if
(
needToMerge
(
pBlock
,
pInfo
->
groupInfo
,
pInfo
->
groupVal
,
i
))
{
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
);
}
else
{
doFinalizeResultImpl
(
pCtx
,
numOfExpr
);
int32_t
numOfRows
=
getNumOfResult
(
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
);
// setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
// TODO check for available buffer;
// next group info data
pInfo
->
binfo
.
pRes
->
info
.
rows
+=
numOfRows
;
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
if
(
pCtx
[
j
].
functionId
<
0
)
{
continue
;
}
pCtx
[
j
].
fpSet
->
addInput
(
&
pCtx
[
j
]);
}
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
);
pInfo
->
hasGroupVal
=
saveCurrentTuple
(
pInfo
->
groupVal
,
pInfo
->
groupInfo
,
pBlock
,
i
);
}
}
}
}
static
SSDataBlock
*
doMerge
(
SOperatorInfo
*
pOperator
)
{
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
SSortHandle
*
pHandle
=
pInfo
->
pSortHandle
;
SSDataBlock
*
pDataBlock
=
createOneDataBlock
(
pInfo
->
binfo
.
pRes
);
blockDataEnsureCapacity
(
pDataBlock
,
pInfo
->
binfo
.
capacity
);
while
(
1
)
{
blockDataClearup
(
pDataBlock
,
pInfo
->
hasVarCol
);
while
(
1
)
{
STupleHandle
*
pTupleHandle
=
sortNextTuple
(
pHandle
);
if
(
pTupleHandle
==
NULL
)
{
break
;
}
// build datablock for merge for one group
appendOneRowToDataBlock
(
pDataBlock
,
pTupleHandle
);
if
(
pDataBlock
->
info
.
rows
>=
pInfo
->
binfo
.
capacity
)
{
break
;
}
}
if
(
pDataBlock
->
info
.
rows
==
0
)
{
break
;
}
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pDataBlock
,
TSDB_ORDER_ASC
);
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv, true);
doMergeImpl
(
pOperator
,
pOperator
->
numOfOutput
,
pDataBlock
);
// flush to tuple store, and after all data have been handled, return to upstream node or sink node
}
doFinalizeResultImpl
(
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
);
int32_t
numOfRows
=
getNumOfResult
(
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
);
// setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
// TODO check for available buffer;
// next group info data
pInfo
->
binfo
.
pRes
->
info
.
rows
+=
numOfRows
;
return
(
pInfo
->
binfo
.
pRes
->
info
.
rows
>
0
)
?
pInfo
->
binfo
.
pRes
:
NULL
;
}
static
SSDataBlock
*
doSortedMerge
(
void
*
param
,
bool
*
newgroup
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
...
...
@@ -5675,7 +5830,7 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSortedMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pInfo
->
hasVarCol
,
pInfo
->
numOfRowsInRes
);
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pInfo
->
hasVarCol
,
pInfo
->
binfo
.
capacity
);
}
SSchema
*
p
=
blockDataExtractSchema
(
pInfo
->
binfo
.
pRes
,
NULL
);
...
...
@@ -5698,7 +5853,7 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pInfo
->
hasVarCol
,
pInfo
->
numOfRowsInRes
);
return
doMerge
(
pOperator
);
}
static
SArray
*
createBlockOrder
(
SArray
*
pExprInfo
,
SArray
*
pOrderVal
)
{
...
...
@@ -5724,29 +5879,88 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
return
pOrderInfo
;
}
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SArray
*
pExprInfo
,
SArray
*
pOrderVal
,
SExecTaskInfo
*
pTaskInfo
)
{
static
int32_t
initGroupCol
(
SArray
*
pExprInfo
,
SArray
*
pGroupInfo
,
SSortedMergeOperatorInfo
*
pInfo
)
{
if
(
pGroupInfo
==
NULL
||
taosArrayGetSize
(
pGroupInfo
)
==
0
)
{
return
0
;
}
int32_t
len
=
0
;
SArray
*
plist
=
taosArrayInit
(
3
,
sizeof
(
SColumn
));
pInfo
->
groupInfo
=
taosArrayInit
(
3
,
sizeof
(
int32_t
));
if
(
plist
==
NULL
||
pInfo
->
groupInfo
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
size_t
numOfGroupCol
=
taosArrayGetSize
(
pInfo
->
groupInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfGroupCol
;
++
i
)
{
SColumn
*
pCol
=
taosArrayGet
(
pGroupInfo
,
i
);
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pExprInfo
);
++
j
)
{
SExprInfo
*
pe
=
taosArrayGet
(
pExprInfo
,
j
);
if
(
pe
->
base
.
resSchema
.
colId
==
pCol
->
info
.
colId
)
{
taosArrayPush
(
plist
,
pCol
);
taosArrayPush
(
pInfo
->
groupInfo
,
&
j
);
len
+=
pCol
->
info
.
bytes
;
break
;
}
}
}
ASSERT
(
taosArrayGetSize
(
pGroupInfo
)
==
taosArrayGetSize
(
plist
));
pInfo
->
groupVal
=
calloc
(
1
,
(
POINTER_BYTES
*
numOfGroupCol
+
len
));
if
(
pInfo
->
groupVal
==
NULL
)
{
taosArrayDestroy
(
plist
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
offset
=
0
;
char
*
start
=
(
char
*
)(
pInfo
->
groupVal
+
(
POINTER_BYTES
*
numOfGroupCol
));
for
(
int32_t
i
=
0
;
i
<
numOfGroupCol
;
++
i
)
{
pInfo
->
groupVal
[
i
]
=
start
+
offset
;
SColumn
*
pCol
=
taosArrayGet
(
plist
,
i
);
offset
+=
pCol
->
info
.
bytes
;
}
taosArrayDestroy
(
plist
);
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SArray
*
pExprInfo
,
SArray
*
pOrderVal
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
SSortedMergeOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SSortedMergeOperatorInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
tfree
(
pInfo
);
tfree
(
pOperator
);
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
NULL
;
goto
_error
;
}
int32_t
numOfOutput
=
taosArrayGetSize
(
pExprInfo
);
pInfo
->
binfo
.
capacity
=
4096
;
pInfo
->
binfo
.
pCtx
=
createSqlFunctionCtx_rv
(
pExprInfo
,
&
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
binfo
.
resRowSize
);
pInfo
->
binfo
.
pCtx
=
createSqlFunctionCtx_rv
(
pExprInfo
,
&
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
binfo
.
resRowSize
);
pInfo
->
binfo
.
pRes
=
createOutputBuf_rv
(
pExprInfo
,
pInfo
->
binfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
1
);
if
(
pInfo
->
binfo
.
pCtx
==
NULL
||
pInfo
->
binfo
.
pRes
==
NULL
)
{
goto
_error
;
}
// pInfo->resultRowFactor =
// (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
int32_t
code
=
initAggSup
(
&
pInfo
->
aggSup
,
pExprInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
setDefaultOutputBuf_rv
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
MAIN_SCAN
,
pTaskInfo
);
code
=
initGroupCol
(
pExprInfo
,
pGroupInfo
,
pInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
// pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr,
// pRuntimeEnv->pQueryAttr->topBotQuery, false));
pInfo
->
sortBufSize
=
1024
*
16
;
// 1MB
pInfo
->
bufPageSize
=
1024
;
pInfo
->
numOfRowsInRes
=
1024
;
pInfo
->
binfo
.
pRes
=
createOutputBuf_rv
(
pExprInfo
,
pInfo
->
numOfRowsInRes
);
pInfo
->
orderInfo
=
createBlockOrder
(
pExprInfo
,
pOrderVal
);
int32_t
numOfRows
=
1
;
pInfo
->
binfo
.
capacity
=
blockDataGetCapacityInRow
(
pInfo
->
binfo
.
pRes
,
pInfo
->
bufPageSize
)
;
pOperator
->
name
=
"SortedMerge"
;
pOperator
->
operatorType
=
OP_SortedMerge
;
...
...
@@ -5754,16 +5968,28 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pExpr
=
exprArrayDup
(
pExprInfo
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
exec
=
doSortedMerge
;
pOperator
->
cleanupFn
=
destroySortedMergeOperatorInfo
;
for
(
int32_t
i
=
0
;
i
<
numOfDownstream
;
++
i
)
{
appendDownstream
(
pOperator
,
downstream
[
i
]);
code
=
appendDownstream
(
pOperator
,
downstream
,
numOfDownstream
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
pOperator
;
_error:
if
(
pInfo
!=
NULL
)
{
destroySortedMergeOperatorInfo
(
pInfo
,
numOfOutput
);
}
tfree
(
pInfo
);
tfree
(
pOperator
);
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
NULL
;
}
static
SSDataBlock
*
doSort
(
void
*
param
,
bool
*
newgroup
)
{
...
...
@@ -5844,7 +6070,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pOperator
->
exec
=
doSort
;
pOperator
->
cleanupFn
=
destroyOrderOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -6761,18 +6987,37 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
tfree
(
pOperator
);
}
static
int32_t
initAggSup
(
SAggSupporter
*
pAggSup
,
SArray
*
pExprInfo
)
{
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pAggSup
->
keyBuf
=
calloc
(
1
,
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
POINTER_BYTES
);
pAggSup
->
pResultRowHashTable
=
taosHashInit
(
10
,
hashFn
,
true
,
HASH_NO_LOCK
);
pAggSup
->
pResultRowListSet
=
taosHashInit
(
100
,
hashFn
,
false
,
HASH_NO_LOCK
);
pAggSup
->
pool
=
initResultRowPool
(
getResultRowSize
(
pExprInfo
));
pAggSup
->
pResultRowArrayList
=
taosArrayInit
(
10
,
sizeof
(
SResultRowCell
));
if
(
pAggSup
->
keyBuf
==
NULL
||
pAggSup
->
pResultRowArrayList
==
NULL
||
pAggSup
->
pResultRowListSet
==
NULL
||
pAggSup
->
pResultRowHashTable
==
NULL
||
pAggSup
->
pool
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
static
void
clearupAggSup
(
SAggSupporter
*
pAggSup
)
{
tfree
(
pAggSup
->
keyBuf
);
taosHashCleanup
(
pAggSup
->
pResultRowHashTable
);
taosHashCleanup
(
pAggSup
->
pResultRowListSet
);
taosArrayDestroy
(
pAggSup
->
pResultRowArrayList
);
destroyResultRowPool
(
pAggSup
->
pool
);
}
static
int32_t
initAggInfo
(
SAggOperatorInfo
*
pInfo
,
SArray
*
pExprInfo
,
int32_t
numOfRows
,
const
STableGroupInfo
*
pTableGroupInfo
)
{
pInfo
->
binfo
.
pRes
=
createOutputBuf_rv
(
pExprInfo
,
numOfRows
);
pInfo
->
binfo
.
pCtx
=
createSqlFunctionCtx_rv
(
pExprInfo
,
&
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
binfo
.
resRowSize
);
pInfo
->
binfo
.
capacity
=
4096
;
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pInfo
->
pResultRowHashTable
=
taosHashInit
(
10
,
hashFn
,
true
,
HASH_NO_LOCK
);
pInfo
->
pResultRowListSet
=
taosHashInit
(
100
,
hashFn
,
false
,
HASH_NO_LOCK
);
pInfo
->
pool
=
initResultRowPool
(
getResultRowSize
(
pExprInfo
));
pInfo
->
pResultRowArrayList
=
taosArrayInit
(
10
,
sizeof
(
SResultRowCell
));
initAggSup
(
&
pInfo
->
aggSup
,
pExprInfo
);
pInfo
->
pTableQueryInfo
=
calloc
(
pTableGroupInfo
->
numOfTables
,
sizeof
(
STableQueryInfo
));
int32_t
index
=
0
;
...
...
@@ -6801,7 +7046,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
initAggInfo
(
pInfo
,
pExprInfo
,
numOfRows
,
pTableGroupInfo
);
setDefaultOutputBuf_rv
(
pInfo
,
MAIN_SCAN
,
pTaskInfo
);
setDefaultOutputBuf_rv
(
&
pInfo
->
binfo
,
&
pInfo
->
aggSup
,
MAIN_SCAN
,
pTaskInfo
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableAggregate"
;
...
...
@@ -6815,7 +7060,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
exec
=
doAggregate
;
pOperator
->
cleanupFn
=
destroyAggOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -6899,7 +7144,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
initAggInfo
(
pInfo
,
pExprInfo
,
numOfRows
,
pTableGroupInfo
);
size_t
tableGroup
=
taosArrayGetSize
(
pTableGroupInfo
->
pGroupList
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
tableGroup
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
(
int32_t
)
tableGroup
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"MultiTableAggregate"
;
...
...
@@ -6912,7 +7157,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
pOperator
->
exec
=
doMultiTableAggregate
;
pOperator
->
cleanupFn
=
destroyAggOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -6927,7 +7172,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pBInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pInfo
->
bufCapacity
);
pBInfo
->
pCtx
=
createSqlFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pBInfo
->
rowCellInfoOffset
);
initResultRowInfo
(
&
pBInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pBInfo
->
resultRowInfo
,
8
);
setDefaultOutputBuf
(
pRuntimeEnv
,
pBInfo
,
pInfo
->
seed
,
MAIN_SCAN
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -6942,7 +7187,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator
->
exec
=
doProjectOperation
;
pOperator
->
cleanupFn
=
destroyProjectOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7000,7 +7245,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
cleanupFn
=
destroyConditionOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7018,7 +7263,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator
->
exec
=
doLimit
;
pOperator
->
info
=
pInfo
;
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7028,7 +7273,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe
pInfo
->
pCtx
=
createSqlFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -7043,7 +7288,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe
pOperator
->
exec
=
doIntervalAgg
;
pOperator
->
cleanupFn
=
destroyBasicOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7053,7 +7298,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pInfo
->
pCtx
=
createSqlFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -7068,7 +7313,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator
->
exec
=
doAllIntervalAgg
;
pOperator
->
cleanupFn
=
destroyBasicOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7078,7 +7323,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pInfo
->
reptScan
=
false
;
pInfo
->
binfo
.
pCtx
=
createSqlFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"StateWindowOperator"
;
...
...
@@ -7092,7 +7337,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pOperator
->
exec
=
doStateWindowAgg
;
pOperator
->
cleanupFn
=
destroyStateWindowOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
SOperatorInfo
*
createSWindowOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
...
...
@@ -7100,7 +7345,7 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pInfo
->
binfo
.
pCtx
=
createSqlFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
pInfo
->
prevTs
=
INT64_MIN
;
pInfo
->
reptScan
=
false
;
...
...
@@ -7117,7 +7362,7 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator
->
exec
=
doSessionWindowAgg
;
pOperator
->
cleanupFn
=
destroySWindowOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7126,7 +7371,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pInfo
->
pCtx
=
createSqlFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"MultiTableTimeIntervalOperator"
;
...
...
@@ -7141,7 +7386,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pOperator
->
exec
=
doSTableIntervalAgg
;
pOperator
->
cleanupFn
=
destroyBasicOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7150,7 +7395,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pInfo
->
pCtx
=
createSqlFunctionCtx
(
pRuntimeEnv
,
pExpr
,
numOfOutput
,
&
pInfo
->
rowCellInfoOffset
);
pInfo
->
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
resultRowInfo
,
8
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"AllMultiTableTimeIntervalOperator"
;
...
...
@@ -7165,12 +7410,11 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator
->
exec
=
doAllSTableIntervalAgg
;
pOperator
->
cleanupFn
=
destroyBasicOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
SOperatorInfo
*
createGroupbyOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
)
{
SGroupbyOperatorInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SGroupbyOperatorInfo
));
pInfo
->
colIndex
=
-
1
;
// group by column index
...
...
@@ -7184,7 +7428,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
(
int32_t
)(
getRowNumForMultioutput
(
pQueryAttr
,
pQueryAttr
->
topBotQuery
,
pQueryAttr
->
stableQuery
)));
pInfo
->
binfo
.
pRes
=
createOutputBuf
(
pExpr
,
numOfOutput
,
pRuntimeEnv
->
resultInfo
.
capacity
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
,
TSDB_DATA_TYPE_INT
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"GroupbyAggOperator"
;
...
...
@@ -7198,7 +7442,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator
->
exec
=
hashGroupbyAggregate
;
pOperator
->
cleanupFn
=
destroyGroupbyOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7237,7 +7481,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
pOperator
->
exec
=
doFill
;
pOperator
->
cleanupFn
=
destroySFillOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7285,7 +7529,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator
->
pRuntimeEnv
=
pRuntimeEnv
;
pOperator
->
cleanupFn
=
destroySlimitOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7583,7 +7827,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
pOperator
->
pExpr
=
pExpr
;
pOperator
->
cleanupFn
=
destroyDistinctOperatorInfo
;
appendDownstream
(
pOperator
,
downstream
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
}
...
...
@@ -7963,7 +8207,6 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pExprInfo, void *pQueryMsg) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
updateOutputBufForTopBotQuery
(
SQueriedTableInfo
*
pTableInfo
,
SColumnInfo
*
pTagCols
,
SExprInfo
*
pExprs
,
int32_t
numOfOutput
,
int32_t
tagLen
,
bool
superTable
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
int16_t
functId
=
getExprFunctionId
(
&
pExprs
[
i
]);
...
...
@@ -8127,66 +8370,6 @@ int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) {
// return ret;
}
// todo refactor
int32_t
createIndirectQueryFuncExprFromMsg
(
SQueryTableReq
*
pQueryMsg
,
int32_t
numOfOutput
,
SExprInfo
**
pExprInfo
,
SSqlExpr
**
pExpr
,
SExprInfo
*
prevExpr
,
struct
SUdfInfo
*
pUdfInfo
)
{
// *pExprInfo = NULL;
// int32_t code = TSDB_CODE_SUCCESS;
//
// SExprInfo *pExprs = (SExprInfo *)calloc(numOfOutput, sizeof(SExprInfo));
// if (pExprs == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
//
// for (int32_t i = 0; i < numOfOutput; ++i) {
// pExprs[i].base = *pExpr[i];
// memset(pExprs[i].base.param, 0, sizeof(SVariant) * tListLen(pExprs[i].base.param));
//
// for (int32_t j = 0; j < pExpr[i]->numOfParams; ++j) {
// taosVariantAssign(&pExprs[i].base.param[j], &pExpr[i]->param[j]);
// }
//
// pExprs[i].base.resSchema.type = 0;
//
// int16_t type = 0;
// int16_t bytes = 0;
//
// // parse the arithmetic expression
// if (pExprs[i].base.functionId == FUNCTION_ARITHM) {
// code = buildArithmeticExprFromMsg(&pExprs[i], pQueryMsg);
//
// if (code != TSDB_CODE_SUCCESS) {
// tfree(pExprs);
// return code;
// }
//
// type = TSDB_DATA_TYPE_DOUBLE;
// bytes = tDataTypes[type].bytes;
// } else {
// int32_t index = pExprs[i].base.colInfo.colIndex;
// assert(prevExpr[index].base.resSchema.colId == pExprs[i].base.pColumns->info.colId);
//
// type = prevExpr[index].base.resSchema.type;
// bytes = prevExpr[index].base.resSchema.bytes;
// }
//
// int32_t param = (int32_t)pExprs[i].base.param[0].i;
// if (getResultDataInfo(type, bytes, functionId, param, &pExprs[i].base.resSchema.type, &pExprs[i].base.resSchema.bytes,
// &pExprs[i].base.interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) {
// tfree(pExprs);
// return TSDB_CODE_QRY_INVALID_MSG;
// }
//
// assert(isValidDataType(pExprs[i].base.resSchema.type));
// }
//
// *pExprInfo = pExprs;
return
TSDB_CODE_SUCCESS
;
}
SGroupbyExpr
*
createGroupbyExprFromMsg
(
SQueryTableReq
*
pQueryMsg
,
SColIndex
*
pColIndex
,
int32_t
*
code
)
{
if
(
pQueryMsg
->
numOfGroupCols
==
0
)
{
return
NULL
;
...
...
@@ -8595,30 +8778,3 @@ void releaseQueryBuf(size_t numOfTables) {
// restore value is not enough buffer available
atomic_add_fetch_64
(
&
tsQueryBufferSizeBytes
,
t
);
}
void
freeQueryAttr
(
STaskAttr
*
pQueryAttr
)
{
if
(
pQueryAttr
!=
NULL
)
{
if
(
pQueryAttr
->
fillVal
!=
NULL
)
{
tfree
(
pQueryAttr
->
fillVal
);
}
pQueryAttr
->
pFilterInfo
=
doDestroyFilterInfo
(
pQueryAttr
->
pFilterInfo
,
pQueryAttr
->
numOfFilterCols
);
pQueryAttr
->
pExpr1
=
destroyQueryFuncExpr
(
pQueryAttr
->
pExpr1
,
pQueryAttr
->
numOfOutput
);
pQueryAttr
->
pExpr2
=
destroyQueryFuncExpr
(
pQueryAttr
->
pExpr2
,
pQueryAttr
->
numOfExpr2
);
pQueryAttr
->
pExpr3
=
destroyQueryFuncExpr
(
pQueryAttr
->
pExpr3
,
pQueryAttr
->
numOfExpr3
);
tfree
(
pQueryAttr
->
tagColList
);
tfree
(
pQueryAttr
->
pFilterInfo
);
pQueryAttr
->
tableCols
=
freeColumnInfo
(
pQueryAttr
->
tableCols
,
pQueryAttr
->
numOfCols
);
if
(
pQueryAttr
->
pGroupbyExpr
!=
NULL
)
{
taosArrayDestroy
(
pQueryAttr
->
pGroupbyExpr
->
columnInfo
);
tfree
(
pQueryAttr
->
pGroupbyExpr
);
}
// filterFreeInfo(pQueryAttr->pFilters);
}
}
source/libs/executor/src/tsort.c
浏览文件 @
b95e95dc
...
...
@@ -123,23 +123,6 @@ int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource) {
taosArrayPush
(
pSortHandle
->
pOrderedSource
,
&
pSource
);
}
static
SSDataBlock
*
createDataBlock
(
const
SSDataBlock
*
pDataBlock
)
{
int32_t
numOfCols
=
pDataBlock
->
info
.
numOfCols
;
SSDataBlock
*
pBlock
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
pBlock
->
pDataBlock
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColumnInfoData
));
pBlock
->
info
.
numOfCols
=
numOfCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{
0
};
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
colInfo
.
info
=
p
->
info
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfo
);
}
return
pBlock
;
}
static
int32_t
doAddNewExternalMemSource
(
SDiskbasedBuf
*
pBuf
,
SArray
*
pAllSources
,
SSDataBlock
*
pBlock
,
int32_t
*
sourceId
)
{
SExternalMemSource
*
pSource
=
calloc
(
1
,
sizeof
(
SExternalMemSource
));
if
(
pSource
==
NULL
)
{
...
...
@@ -198,7 +181,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataClearup
(
pDataBlock
,
pHandle
->
hasVarCol
);
SSDataBlock
*
pBlock
=
createDataBlock
(
pDataBlock
);
SSDataBlock
*
pBlock
=
create
One
DataBlock
(
pDataBlock
);
int32_t
code
=
doAddNewExternalMemSource
(
pHandle
->
pBuf
,
pHandle
->
pOrderedSource
,
pBlock
,
&
pHandle
->
sourceId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -263,7 +246,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou
if
(
isNull
)
{
colDataAppend
(
pColInfo
,
pBlock
->
info
.
rows
,
NULL
,
true
);
}
else
{
char
*
pData
=
colDataGet
(
pSrcColInfo
,
*
rowIndex
);
char
*
pData
=
colDataGet
Data
(
pSrcColInfo
,
*
rowIndex
);
colDataAppend
(
pColInfo
,
pBlock
->
info
.
rows
,
pData
,
false
);
}
}
...
...
@@ -279,15 +262,14 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
*/
if
(
pSource
->
src
.
rowIndex
>=
pSource
->
src
.
pBlock
->
info
.
rows
)
{
pSource
->
src
.
rowIndex
=
0
;
pSource
->
pageIndex
+=
1
;
if
(
p
Source
->
pageIndex
>=
taosArrayGetSize
(
pSource
->
pageIdList
)
)
{
(
*
numOfCompleted
)
+=
1
;
pSource
->
src
.
rowIndex
=
-
1
;
pSource
->
pageIndex
=
-
1
;
pSource
->
src
.
pBlock
=
blockDataDestroy
(
pSource
->
src
.
pBlock
)
;
}
else
{
if
(
pHandle
->
type
==
SORT_SINGLESOURCE_SORT
)
{
if
(
p
Handle
->
type
==
SORT_SINGLESOURCE_SORT
)
{
if
(
pSource
->
pageIndex
>=
taosArrayGetSize
(
pSource
->
pageIdList
))
{
(
*
numOfCompleted
)
+=
1
;
pSource
->
src
.
rowIndex
=
-
1
;
pSource
->
pageIndex
=
-
1
;
pSource
->
src
.
pBlock
=
blockDataDestroy
(
pSource
->
src
.
pBlock
);
}
else
{
SPageInfo
*
pPgInfo
=
*
(
SPageInfo
**
)
taosArrayGet
(
pSource
->
pageIdList
,
pSource
->
pageIndex
);
SFilePage
*
pPage
=
getBufPage
(
pHandle
->
pBuf
,
getPageId
(
pPgInfo
));
...
...
@@ -297,12 +279,12 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
}
releaseBufPage
(
pHandle
->
pBuf
,
pPage
);
}
else
{
pSource
->
src
.
pBlock
=
pHandle
->
fetchfp
(((
SGenericSource
*
)
pSource
)
->
param
);
if
(
pSource
->
src
.
pBlock
==
NULL
)
{
(
*
numOfCompleted
)
+=
1
;
pSource
->
src
.
rowIndex
=
-
1
;
}
}
}
else
{
pSource
->
src
.
pBlock
=
pHandle
->
fetchfp
(((
SGenericSource
*
)
pSource
)
->
param
);
if
(
pSource
->
src
.
pBlock
==
NULL
)
{
(
*
numOfCompleted
)
+=
1
;
pSource
->
src
.
rowIndex
=
-
1
;
}
}
}
...
...
@@ -404,8 +386,8 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
return
pParam
->
nullFirst
?
-
1
:
1
;
}
void
*
left1
=
colDataGet
(
pLeftColInfoData
,
pLeftSource
->
src
.
rowIndex
);
void
*
right1
=
colDataGet
(
pRightColInfoData
,
pRightSource
->
src
.
rowIndex
);
void
*
left1
=
colDataGet
Data
(
pLeftColInfoData
,
pLeftSource
->
src
.
rowIndex
);
void
*
right1
=
colDataGet
Data
(
pRightColInfoData
,
pRightSource
->
src
.
rowIndex
);
switch
(
pLeftColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
...
...
@@ -499,7 +481,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
tMergeTreeDestroy
(
pHandle
->
pMergeTree
);
pHandle
->
numOfCompletedSources
=
0
;
SSDataBlock
*
pBlock
=
createDataBlock
(
pHandle
->
pDataBlock
);
SSDataBlock
*
pBlock
=
create
One
DataBlock
(
pHandle
->
pDataBlock
);
code
=
doAddNewExternalMemSource
(
pHandle
->
pBuf
,
pResList
,
pBlock
,
&
pHandle
->
sourceId
);
if
(
code
!=
0
)
{
return
code
;
...
...
@@ -545,7 +527,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
}
if
(
pHandle
->
pDataBlock
==
NULL
)
{
pHandle
->
pDataBlock
=
createDataBlock
(
pBlock
);
pHandle
->
pDataBlock
=
create
One
DataBlock
(
pBlock
);
}
int32_t
code
=
blockDataMerge
(
pHandle
->
pDataBlock
,
pBlock
);
...
...
@@ -646,6 +628,7 @@ STupleHandle* sortNextTuple(SSortHandle* pHandle) {
return
NULL
;
}
// All the data are hold in the buffer, no external sort is invoked.
if
(
pHandle
->
inMemSort
)
{
pHandle
->
tupleHandle
.
rowIndex
+=
1
;
if
(
pHandle
->
tupleHandle
.
rowIndex
==
pHandle
->
pDataBlock
->
info
.
rows
)
{
...
...
@@ -671,6 +654,7 @@ STupleHandle* sortNextTuple(SSortHandle* pHandle) {
return
NULL
;
}
// Get the adjusted value after the loser tree is updated.
index
=
tMergeTreeGetChosenIndex
(
pHandle
->
pMergeTree
);
pSource
=
pHandle
->
cmpParam
.
pSources
[
index
];
...
...
@@ -691,5 +675,5 @@ bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex) {
void
*
sortGetValue
(
STupleHandle
*
pVHandle
,
int32_t
colIndex
)
{
SColumnInfoData
*
pColInfo
=
TARRAY_GET_ELEM
(
pVHandle
->
pBlock
->
pDataBlock
,
colIndex
);
return
colDataGet
(
pColInfo
,
pVHandle
->
rowIndex
);
return
colDataGet
Data
(
pColInfo
,
pVHandle
->
rowIndex
);
}
source/libs/executor/test/executorTests.cpp
浏览文件 @
b95e95dc
...
...
@@ -42,22 +42,21 @@ enum {
};
typedef
struct
SDummyInputInfo
{
int32_t
max
;
int32_t
current
;
int32_t
startVal
;
int32_t
type
;
int32_t
totalPages
;
// numOfPages
int32_t
current
;
int32_t
startVal
;
int32_t
type
;
int32_t
numOfRowsPerPage
;
SSDataBlock
*
pBlock
;
}
SDummyInputInfo
;
SSDataBlock
*
getDummyBlock
(
void
*
param
,
bool
*
newgroup
)
{
SOperatorInfo
*
pOperator
=
static_cast
<
SOperatorInfo
*>
(
param
);
SDummyInputInfo
*
pInfo
=
static_cast
<
SDummyInputInfo
*>
(
pOperator
->
info
);
if
(
pInfo
->
current
>=
pInfo
->
max
)
{
if
(
pInfo
->
current
>=
pInfo
->
totalPages
)
{
return
NULL
;
}
int32_t
numOfRows
=
1000
;
if
(
pInfo
->
pBlock
==
NULL
)
{
pInfo
->
pBlock
=
static_cast
<
SSDataBlock
*>
(
calloc
(
1
,
sizeof
(
SSDataBlock
)));
...
...
@@ -67,8 +66,8 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
colInfo
.
info
.
type
=
TSDB_DATA_TYPE_INT
;
colInfo
.
info
.
bytes
=
sizeof
(
int32_t
);
colInfo
.
info
.
colId
=
1
;
colInfo
.
pData
=
static_cast
<
char
*>
(
calloc
(
numOfRows
,
sizeof
(
int32_t
)));
colInfo
.
nullbitmap
=
static_cast
<
char
*>
(
calloc
(
1
,
(
numOfRows
+
7
)
/
8
));
colInfo
.
pData
=
static_cast
<
char
*>
(
calloc
(
pInfo
->
numOfRowsPerPage
,
sizeof
(
int32_t
)));
colInfo
.
nullbitmap
=
static_cast
<
char
*>
(
calloc
(
1
,
(
pInfo
->
numOfRowsPerPage
+
7
)
/
8
));
taosArrayPush
(
pInfo
->
pBlock
->
pDataBlock
,
&
colInfo
);
...
...
@@ -91,7 +90,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
char
buf
[
128
]
=
{
0
};
char
b1
[
128
]
=
{
0
};
int32_t
v
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfRowsPerPage
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
static_cast
<
SColumnInfoData
*>
(
TARRAY_GET_ELEM
(
pBlock
->
pDataBlock
,
0
));
if
(
pInfo
->
type
==
data_desc
)
{
...
...
@@ -111,22 +110,23 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) {
// colDataAppend(pColInfo2, i, b1, false);
}
pBlock
->
info
.
rows
=
numOfRows
;
pBlock
->
info
.
rows
=
pInfo
->
numOfRowsPerPage
;
pBlock
->
info
.
numOfCols
=
1
;
pInfo
->
current
+=
1
;
return
pBlock
;
}
SOperatorInfo
*
createDummyOperator
(
int32_t
numOfBlocks
,
int32_t
type
)
{
SOperatorInfo
*
createDummyOperator
(
int32_t
startVal
,
int32_t
numOfBlocks
,
int32_t
rowsPerPage
,
int32_t
type
)
{
SOperatorInfo
*
pOperator
=
static_cast
<
SOperatorInfo
*>
(
calloc
(
1
,
sizeof
(
SOperatorInfo
)));
pOperator
->
name
=
"dummyInputOpertor4Test"
;
pOperator
->
exec
=
getDummyBlock
;
SDummyInputInfo
*
pInfo
=
(
SDummyInputInfo
*
)
calloc
(
1
,
sizeof
(
SDummyInputInfo
));
pInfo
->
max
=
numOfBlocks
;
pInfo
->
startVal
=
1500000
;
pInfo
->
type
=
type
;
pInfo
->
totalPages
=
numOfBlocks
;
pInfo
->
startVal
=
startVal
;
pInfo
->
numOfRowsPerPage
=
rowsPerPage
;
pInfo
->
type
=
type
;
pOperator
->
info
=
pInfo
;
return
pOperator
;
...
...
@@ -257,7 +257,7 @@ TEST(testCase, inMem_sort_Test) {
SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
for(int32_t i = 0; i < pRes->info.rows; ++i) {
char* p = colDataGet(pCol2, i);
char* p = colDataGet
Data
(pCol2, i);
printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
}
}
...
...
@@ -341,7 +341,7 @@ TEST(testCase, external_sort_Test) {
SColumnInfoData* pCol1 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 0));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
for (int32_t i = 0; i < pRes->info.rows; ++i) {
// char* p = colDataGet(pCol2, i);
// char* p = colDataGet
Data
(pCol2, i);
printf("%d: %d\n", total++, ((int32_t*)pCol1->pData)[i]);
// printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
}
...
...
@@ -357,6 +357,7 @@ TEST(testCase, external_sort_Test) {
taosArrayDestroy(pOrderVal);
}
#endif
TEST
(
testCase
,
sorted_merge_Test
)
{
srand
(
time
(
NULL
));
...
...
@@ -370,7 +371,12 @@ TEST(testCase, sorted_merge_Test) {
SArray
*
pExprInfo
=
taosArrayInit
(
4
,
sizeof
(
SExprInfo
));
SExprInfo
*
exp
=
static_cast
<
SExprInfo
*>
(
calloc
(
1
,
sizeof
(
SExprInfo
)));
exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res");
exp
->
base
.
resSchema
=
createSchema
(
TSDB_DATA_TYPE_BIGINT
,
sizeof
(
int64_t
),
1
,
"count_result"
);
exp
->
base
.
pColumns
=
static_cast
<
SColumn
*>
(
calloc
(
1
,
sizeof
(
SColumn
)));
exp
->
base
.
pColumns
->
flag
=
TSDB_COL_NORMAL
;
exp
->
base
.
pColumns
->
info
=
(
SColumnInfo
)
{.
colId
=
1
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
bytes
=
4
};
exp
->
base
.
numOfCols
=
1
;
taosArrayPush
(
pExprInfo
,
&
exp
);
SExprInfo
*
exp1
=
static_cast
<
SExprInfo
*>
(
calloc
(
1
,
sizeof
(
SExprInfo
)));
...
...
@@ -380,10 +386,10 @@ TEST(testCase, sorted_merge_Test) {
int32_t
numOfSources
=
10
;
SOperatorInfo
**
plist
=
(
SOperatorInfo
**
)
calloc
(
numOfSources
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
numOfSources
;
++
i
)
{
plist[i] = createDummyOperator(1, data_asc);
plist
[
i
]
=
createDummyOperator
(
1
,
1
,
1
,
data_asc
);
}
SOperatorInfo* pOperator = createSortedMergeOperatorInfo(plist, numOfSources, pExprInfo, pOrderVal, NULL);
SOperatorInfo
*
pOperator
=
createSortedMergeOperatorInfo
(
plist
,
numOfSources
,
pExprInfo
,
pOrderVal
,
NULL
,
NULL
);
bool
newgroup
=
false
;
SSDataBlock
*
pRes
=
NULL
;
...
...
@@ -409,8 +415,8 @@ TEST(testCase, sorted_merge_Test) {
SColumnInfoData
*
pCol1
=
static_cast
<
SColumnInfoData
*>
(
taosArrayGet
(
pRes
->
pDataBlock
,
0
));
// SColumnInfoData* pCol2 = static_cast<SColumnInfoData*>(taosArrayGet(pRes->pDataBlock, 1));
for
(
int32_t
i
=
0
;
i
<
pRes
->
info
.
rows
;
++
i
)
{
// char* p = colDataGet(pCol2, i);
printf("%d: %
d\n", total++, ((int32
_t*)pCol1->pData)[i]);
// char* p = colDataGet
Data
(pCol2, i);
printf
(
"%d: %
ld
\n
"
,
total
++
,
((
int64
_t
*
)
pCol1
->
pData
)[
i
]);
// printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p));
}
}
...
...
@@ -424,5 +430,4 @@ TEST(testCase, sorted_merge_Test) {
taosArrayDestroy
(
pExprInfo
);
taosArrayDestroy
(
pOrderVal
);
}
#endif
#pragma GCC diagnostic pop
source/libs/executor/test/executorUtilTests.cpp
浏览文件 @
b95e95dc
...
...
@@ -123,8 +123,8 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
return
pParam
->
nullFirst
?
-
1
:
1
;
}
void
*
left1
=
colDataGet
(
pLeftColInfoData
,
pLeftSource
->
src
.
rowIndex
);
void
*
right1
=
colDataGet
(
pRightColInfoData
,
pRightSource
->
src
.
rowIndex
);
void
*
left1
=
colDataGet
Data
(
pLeftColInfoData
,
pLeftSource
->
src
.
rowIndex
);
void
*
right1
=
colDataGet
Data
(
pRightColInfoData
,
pRightSource
->
src
.
rowIndex
);
switch
(
pLeftColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
...
...
@@ -148,48 +148,15 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
}
}
// namespace
//TEST(testCase, inMem_sort_Test) {
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o);
//
// int32_t numOfRows = 1000;
// SBlockOrderInfo oi = {0};
// oi.order = TSDB_ORDER_ASC;
// oi.colIndex = 0;
// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
// taosArrayPush(orderInfo, &oi);
//
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, "test_abc");
// setFetchRawDataFp(phandle, getSingleColDummyBlock);
// sortAddSource(phandle, &numOfRows);
//
// int32_t code = sortOpen(phandle);
// int32_t row = 1;
//
// while(1) {
// STupleHandle* pTupleHandle = sortNextTuple(phandle);
// if (pTupleHandle == NULL) {
// break;
// }
//
// void* v = sortGetValue(pTupleHandle, 0);
// printf("%d: %d\n", row++, *(int32_t*) v);
//
// }
// destroySortHandle(phandle);
//}
//
TEST
(
testCase
,
external_mem_sort_Test
)
{
#if 0
TEST(testCase, inMem_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
//
int32_t numOfRows = 1000;
int32_t numOfRows = 1000;
SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0;
...
...
@@ -197,7 +164,40 @@ TEST(testCase, external_mem_sort_Test) {
taosArrayPush(orderInfo, &oi);
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, };
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
setFetchRawDataFp(phandle, getSingleColDummyBlock);
sortAddSource(phandle, &numOfRows);
int32_t code = sortOpen(phandle);
int32_t row = 1;
while(1) {
STupleHandle* pTupleHandle = sortNextTuple(phandle);
if (pTupleHandle == NULL) {
break;
}
void* v = sortGetValue(pTupleHandle, 0);
printf("%d: %d\n", row++, *(int32_t*) v);
}
destroySortHandle(phandle);
}
TEST(testCase, external_mem_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, };
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
setFetchRawDataFp(phandle, getSingleColDummyBlock);
...
...
@@ -227,50 +227,52 @@ TEST(testCase, external_mem_sort_Test) {
destroySortHandle(phandle);
}
//TEST(testCase, ordered_merge_sort_Test) {
// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
// SOrder o = {.order = TSDB_ORDER_ASC};
// o.col.info.colId = 1;
// o.col.info.type = TSDB_DATA_TYPE_INT;
// taosArrayPush(pOrderVal, &o);
//
// int32_t numOfRows = 1000;
// SBlockOrderInfo oi = {0};
// oi.order = TSDB_ORDER_ASC;
// oi.colIndex = 0;
// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
// taosArrayPush(orderInfo, &oi);
//
// SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4};
// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc");
// setFetchRawDataFp(phandle, getSingleColDummyBlock);
// setComparFn(phandle, docomp);
//
// for(int32_t i = 0; i < 10; ++i) {
// SOperatorSource* p = static_cast<SOperatorSource*>(calloc(1, sizeof(SOperatorSource)));
// _info* c = static_cast<_info*>(calloc(1, sizeof(_info)));
// c->count = 1;
// c->pageRows = 1000;
// c->startVal = 0;
//
// p->param = c;
// sortAddSource(phandle, p);
// }
//
// int32_t code = sortOpen(phandle);
// int32_t row = 1;
//
// while(1) {
// STupleHandle* pTupleHandle = sortNextTuple(phandle);
// if (pTupleHandle == NULL) {
// break;
// }
//
// void* v = sortGetValue(pTupleHandle, 0);
// printf("%d: %d\n", row++, *(int32_t*) v);
//
// }
// destroySortHandle(phandle);
//}
TEST(testCase, ordered_merge_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
int32_t numOfRows = 1000;
SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4};
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc");
setFetchRawDataFp(phandle, getSingleColDummyBlock);
setComparFn(phandle, docomp);
for(int32_t i = 0; i < 10; ++i) {
SGenericSource* p = static_cast<SGenericSource*>(calloc(1, sizeof(SGenericSource)));
_info* c = static_cast<_info*>(calloc(1, sizeof(_info)));
c->count = 1;
c->pageRows = 1000;
c->startVal = 0;
p->param = c;
sortAddSource(phandle, p);
}
int32_t code = sortOpen(phandle);
int32_t row = 1;
while(1) {
STupleHandle* pTupleHandle = sortNextTuple(phandle);
if (pTupleHandle == NULL) {
break;
}
void* v = sortGetValue(pTupleHandle, 0);
printf("%d: %d\n", row++, *(int32_t*) v);
}
destroySortHandle(phandle);
}
#endif
#pragma GCC diagnostic pop
source/libs/function/src/taggfunction.c
浏览文件 @
b95e95dc
...
...
@@ -30,9 +30,10 @@
#include "tcompression.h"
//#include "queryLog.h"
#include "tudf.h"
#include "tep.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (
GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes
)
#define GET_INPUT_DATA(x, y) (
(char*) colDataGetData((x)->pInput, (y))
)
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
...
...
@@ -3817,7 +3818,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) {
skey
=
ekey
;
}
}
assignVal
(
pCtx
->
pOutput
,
pCtx
->
pInput
,
pCtx
->
resDataInfo
.
bytes
,
pCtx
->
inputType
);
//
assignVal(pCtx->pOutput, pCtx->pInput, pCtx->resDataInfo.bytes, pCtx->inputType);
}
else
if
(
type
==
TSDB_FILL_NEXT
)
{
TSKEY
ekey
=
skey
;
char
*
val
=
NULL
;
...
...
source/libs/parser/src/queryInfoUtil.c
浏览文件 @
b95e95dc
...
...
@@ -230,7 +230,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo) {
}
void
assignExprInfo
(
SExprInfo
*
dst
,
const
SExprInfo
*
src
)
{
assert
(
dst
!=
NULL
&&
src
!=
NULL
);
assert
(
dst
!=
NULL
&&
src
!=
NULL
&&
src
->
base
.
numOfCols
>
0
);
*
dst
=
*
src
;
#if 0
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录