Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e9fa0018
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e9fa0018
编写于
3月 21, 2023
作者:
G
Ganlin Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(query): improve mode function page buffer allocation
上级
a070da04
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
62 addition
and
34 deletion
+62
-34
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+62
-34
未找到文件。
source/libs/function/src/builtinsimpl.c
浏览文件 @
e9fa0018
...
@@ -244,12 +244,11 @@ typedef struct SUniqueInfo {
...
@@ -244,12 +244,11 @@ typedef struct SUniqueInfo {
typedef
struct
SModeItem
{
typedef
struct
SModeItem
{
int64_t
count
;
int64_t
count
;
STuplePos
dataPos
;
STuplePos
tuplePos
;
STuplePos
tuplePos
;
char
data
[];
}
SModeItem
;
}
SModeItem
;
typedef
struct
SModeInfo
{
typedef
struct
SModeInfo
{
int32_t
numOfPoints
;
uint8_t
colType
;
uint8_t
colType
;
int16_t
colBytes
;
int16_t
colBytes
;
SHashObj
*
pHash
;
SHashObj
*
pHash
;
...
@@ -257,7 +256,7 @@ typedef struct SModeInfo {
...
@@ -257,7 +256,7 @@ typedef struct SModeInfo {
STuplePos
nullTuplePos
;
STuplePos
nullTuplePos
;
bool
nullTupleSaved
;
bool
nullTupleSaved
;
char
pItems
[];
char
*
buf
;
// serialize data buffer
}
SModeInfo
;
}
SModeInfo
;
typedef
struct
SDerivInfo
{
typedef
struct
SDerivInfo
{
...
@@ -3113,7 +3112,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
...
@@ -3113,7 +3112,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
return
buf
;
return
buf
;
}
}
static
int32_t
doSaveTupleData
(
SSerializeDataHandle
*
pHandle
,
const
void
*
pBuf
,
size_t
length
,
STupleKey
key
,
static
int32_t
doSaveTupleData
(
SSerializeDataHandle
*
pHandle
,
const
void
*
pBuf
,
size_t
length
,
STupleKey
*
key
,
STuplePos
*
pPos
)
{
STuplePos
*
pPos
)
{
STuplePos
p
=
{
0
};
STuplePos
p
=
{
0
};
if
(
pHandle
->
pBuf
!=
NULL
)
{
if
(
pHandle
->
pBuf
!=
NULL
)
{
...
@@ -3149,8 +3148,8 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
...
@@ -3149,8 +3148,8 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
releaseBufPage
(
pHandle
->
pBuf
,
pPage
);
releaseBufPage
(
pHandle
->
pBuf
,
pPage
);
}
else
{
}
else
{
// other tuple save policy
// other tuple save policy
if
(
streamStateFuncPut
(
pHandle
->
pState
,
&
key
,
pBuf
,
length
)
>=
0
)
{
if
(
streamStateFuncPut
(
pHandle
->
pState
,
key
,
pBuf
,
length
)
>=
0
)
{
p
.
streamTupleKey
=
key
;
p
.
streamTupleKey
=
*
key
;
}
}
}
}
...
@@ -3174,7 +3173,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
...
@@ -3174,7 +3173,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
}
}
char
*
buf
=
serializeTupleData
(
pSrcBlock
,
rowIndex
,
&
pCtx
->
subsidiaries
,
pCtx
->
subsidiaries
.
buf
);
char
*
buf
=
serializeTupleData
(
pSrcBlock
,
rowIndex
,
&
pCtx
->
subsidiaries
,
pCtx
->
subsidiaries
.
buf
);
return
doSaveTupleData
(
&
pCtx
->
saveHandle
,
buf
,
pCtx
->
subsidiaries
.
rowLen
,
key
,
pPos
);
return
doSaveTupleData
(
&
pCtx
->
saveHandle
,
buf
,
pCtx
->
subsidiaries
.
rowLen
,
&
key
,
pPos
);
}
}
static
int32_t
doUpdateTupleData
(
SSerializeDataHandle
*
pHandle
,
const
void
*
pBuf
,
size_t
length
,
STuplePos
*
pPos
)
{
static
int32_t
doUpdateTupleData
(
SSerializeDataHandle
*
pHandle
,
const
void
*
pBuf
,
size_t
length
,
STuplePos
*
pPos
)
{
...
@@ -4949,7 +4948,7 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
...
@@ -4949,7 +4948,7 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
}
}
bool
getModeFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
bool
getModeFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SModeInfo
)
+
MODE_MAX_RESULT_SIZE
;
pEnv
->
calcMemSize
=
sizeof
(
SModeInfo
);
return
true
;
return
true
;
}
}
...
@@ -4959,7 +4958,6 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
...
@@ -4959,7 +4958,6 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
}
}
SModeInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SModeInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
pInfo
->
numOfPoints
=
0
;
pInfo
->
colType
=
pCtx
->
resDataInfo
.
type
;
pInfo
->
colType
=
pCtx
->
resDataInfo
.
type
;
pInfo
->
colBytes
=
pCtx
->
resDataInfo
.
bytes
;
pInfo
->
colBytes
=
pCtx
->
resDataInfo
.
bytes
;
if
(
pInfo
->
pHash
!=
NULL
)
{
if
(
pInfo
->
pHash
!=
NULL
)
{
...
@@ -4970,38 +4968,60 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
...
@@ -4970,38 +4968,60 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
pInfo
->
nullTupleSaved
=
false
;
pInfo
->
nullTupleSaved
=
false
;
pInfo
->
nullTuplePos
.
pageId
=
-
1
;
pInfo
->
nullTuplePos
.
pageId
=
-
1
;
pInfo
->
buf
=
taosMemoryMalloc
(
pInfo
->
colBytes
);
return
true
;
return
true
;
}
}
static
void
modeFunctionCleanup
(
SModeInfo
*
pInfo
)
{
taosHashCleanup
(
pInfo
->
pHash
);
taosMemoryFreeClear
(
pInfo
->
buf
);
}
static
int32_t
saveModeTupleData
(
SqlFunctionCtx
*
pCtx
,
char
*
data
,
SModeInfo
*
pInfo
,
STuplePos
*
pPos
)
{
if
(
IS_VAR_DATA_TYPE
(
pInfo
->
colType
))
{
memcpy
(
pInfo
->
buf
,
data
,
varDataTLen
(
data
));
}
else
{
memcpy
(
pInfo
->
buf
,
data
,
pInfo
->
colBytes
);
}
return
doSaveTupleData
(
&
pCtx
->
saveHandle
,
pInfo
->
buf
,
pInfo
->
colBytes
,
NULL
,
pPos
);
}
static
int32_t
doModeAdd
(
SModeInfo
*
pInfo
,
int32_t
rowIndex
,
SqlFunctionCtx
*
pCtx
,
char
*
data
)
{
static
int32_t
doModeAdd
(
SModeInfo
*
pInfo
,
int32_t
rowIndex
,
SqlFunctionCtx
*
pCtx
,
char
*
data
)
{
int32_t
hashKeyBytes
=
IS_STR_DATA_TYPE
(
pInfo
->
colType
)
?
varDataTLen
(
data
)
:
pInfo
->
colBytes
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SModeItem
**
pHashItem
=
taosHashGet
(
pInfo
->
pHash
,
data
,
hashKeyBytes
);
int32_t
hashKeyBytes
=
IS_STR_DATA_TYPE
(
pInfo
->
colType
)
?
varDataTLen
(
data
)
:
pInfo
->
colBytes
;
SModeItem
*
pHashItem
=
(
SModeItem
*
)
taosHashGet
(
pInfo
->
pHash
,
data
,
hashKeyBytes
);
if
(
pHashItem
==
NULL
)
{
if
(
pHashItem
==
NULL
)
{
int32_t
size
=
sizeof
(
SModeItem
)
+
pInfo
->
colBytes
;
int32_t
size
=
sizeof
(
SModeItem
);
SModeItem
*
pItem
=
(
SModeItem
*
)(
pInfo
->
pItems
+
pInfo
->
numOfPoints
*
size
);
SModeItem
item
=
{
0
};
memcpy
(
pItem
->
data
,
data
,
hashKeyBytes
);
pItem
->
count
+=
1
;
item
.
count
+=
1
;
code
=
saveModeTupleData
(
pCtx
,
data
,
pInfo
,
&
item
.
dataPos
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
int32_t
code
=
saveTupleData
(
pCtx
,
rowIndex
,
pCtx
->
pSrcBlock
,
&
pItem
->
tuplePos
);
code
=
saveTupleData
(
pCtx
,
rowIndex
,
pCtx
->
pSrcBlock
,
&
item
.
tuplePos
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
}
}
}
}
taosHashPut
(
pInfo
->
pHash
,
data
,
hashKeyBytes
,
&
pItem
,
sizeof
(
SModeItem
*
));
taosHashPut
(
pInfo
->
pHash
,
data
,
hashKeyBytes
,
&
item
,
sizeof
(
SModeItem
));
pInfo
->
numOfPoints
++
;
}
else
{
}
else
{
(
*
pHashItem
)
->
count
+=
1
;
pHashItem
->
count
+=
1
;
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
int32_t
code
=
updateTupleData
(
pCtx
,
rowIndex
,
pCtx
->
pSrcBlock
,
&
((
*
pHashItem
)
->
tuplePos
)
);
int32_t
code
=
updateTupleData
(
pCtx
,
rowIndex
,
pCtx
->
pSrcBlock
,
&
pHashItem
->
tuplePos
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
}
}
}
}
}
}
return
TSDB_CODE_SUCCESS
;
return
code
;
}
}
int32_t
modeFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
modeFunction
(
SqlFunctionCtx
*
pCtx
)
{
...
@@ -5024,18 +5044,15 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
...
@@ -5024,18 +5044,15 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
int32_t
code
=
doModeAdd
(
pInfo
,
i
,
pCtx
,
data
);
int32_t
code
=
doModeAdd
(
pInfo
,
i
,
pCtx
,
data
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
modeFunctionCleanup
(
pInfo
);
return
code
;
return
code
;
}
}
if
(
sizeof
(
SModeInfo
)
+
pInfo
->
numOfPoints
*
(
sizeof
(
SModeItem
)
+
pInfo
->
colBytes
)
>=
MODE_MAX_RESULT_SIZE
)
{
taosHashCleanup
(
pInfo
->
pHash
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
}
if
(
numOfElems
==
0
&&
pCtx
->
subsidiaries
.
num
>
0
&&
!
pInfo
->
nullTupleSaved
)
{
if
(
numOfElems
==
0
&&
pCtx
->
subsidiaries
.
num
>
0
&&
!
pInfo
->
nullTupleSaved
)
{
int32_t
code
=
saveTupleData
(
pCtx
,
pInput
->
startRowIndex
,
pCtx
->
pSrcBlock
,
&
pInfo
->
nullTuplePos
);
int32_t
code
=
saveTupleData
(
pCtx
,
pInput
->
startRowIndex
,
pCtx
->
pSrcBlock
,
&
pInfo
->
nullTuplePos
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
modeFunctionCleanup
(
pInfo
);
return
code
;
return
code
;
}
}
pInfo
->
nullTupleSaved
=
true
;
pInfo
->
nullTupleSaved
=
true
;
...
@@ -5054,26 +5071,37 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
...
@@ -5054,26 +5071,37 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
int32_t
currentRow
=
pBlock
->
info
.
rows
;
int32_t
currentRow
=
pBlock
->
info
.
rows
;
int32_t
resIndex
=
-
1
;
STuplePos
resDataPos
,
resTuplePos
;
int32_t
maxCount
=
0
;
int32_t
maxCount
=
0
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
numOfPoints
;
++
i
)
{
SModeItem
*
pItem
=
(
SModeItem
*
)(
pInfo
->
pItems
+
i
*
(
sizeof
(
SModeItem
)
+
pInfo
->
colBytes
));
void
*
pIter
=
taosHashIterate
(
pInfo
->
pHash
,
NULL
);
while
(
pIter
!=
NULL
)
{
SModeItem
*
pItem
=
(
SModeItem
*
)
pIter
;
if
(
pItem
->
count
>=
maxCount
)
{
if
(
pItem
->
count
>=
maxCount
)
{
maxCount
=
pItem
->
count
;
maxCount
=
pItem
->
count
;
resIndex
=
i
;
resDataPos
=
pItem
->
dataPos
;
resTuplePos
=
pItem
->
tuplePos
;
}
}
pIter
=
taosHashIterate
(
pInfo
->
pHash
,
pIter
);
}
}
if
(
maxCount
!=
0
)
{
if
(
maxCount
!=
0
)
{
SModeItem
*
pResItem
=
(
SModeItem
*
)(
pInfo
->
pItems
+
resIndex
*
(
sizeof
(
SModeItem
)
+
pInfo
->
colBytes
));
const
char
*
pData
=
loadTupleData
(
pCtx
,
&
resDataPos
);
colDataSetVal
(
pCol
,
currentRow
,
pResItem
->
data
,
false
);
if
(
pData
==
NULL
)
{
code
=
setSelectivityValue
(
pCtx
,
pBlock
,
&
pResItem
->
tuplePos
,
currentRow
);
code
=
TSDB_CODE_NO_AVAIL_DISK
;
modeFunctionCleanup
(
pInfo
);
return
code
;
}
colDataSetVal
(
pCol
,
currentRow
,
pData
,
false
);
code
=
setSelectivityValue
(
pCtx
,
pBlock
,
&
resTuplePos
,
currentRow
);
}
else
{
}
else
{
colDataSetNULL
(
pCol
,
currentRow
);
colDataSetNULL
(
pCol
,
currentRow
);
code
=
setSelectivityValue
(
pCtx
,
pBlock
,
&
pInfo
->
nullTuplePos
,
currentRow
);
code
=
setSelectivityValue
(
pCtx
,
pBlock
,
&
pInfo
->
nullTuplePos
,
currentRow
);
}
}
taosHashCleanup
(
pInfo
->
pHash
);
modeFunctionCleanup
(
pInfo
);
return
code
;
return
code
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录