Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2d85d019
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2d85d019
编写于
2月 21, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: add vgroup count
上级
c90e2aa7
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
75 addition
and
67 deletion
+75
-67
include/common/tcommon.h
include/common/tcommon.h
+10
-9
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+37
-35
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+28
-23
未找到文件。
include/common/tcommon.h
浏览文件 @
2d85d019
...
...
@@ -195,7 +195,7 @@ typedef struct SDataBlockInfo {
uint32_t
capacity
;
SBlockID
id
;
int16_t
hasVarCol
;
int16_t
dataLoad
;
// denote if the data is loaded or not
int16_t
dataLoad
;
// denote if the data is loaded or not
// TODO: optimize and remove following
int64_t
version
;
// used for stream, and need serialization
...
...
@@ -204,8 +204,8 @@ typedef struct SDataBlockInfo {
STimeWindow
calWin
;
// used for stream, do not serialize
TSKEY
watermark
;
// used for stream
char
parTbName
[
TSDB_TABLE_NAME_LEN
];
// used for stream partition
STag
*
pTag
;
// used for stream partition
char
parTbName
[
TSDB_TABLE_NAME_LEN
];
// used for stream partition
STag
*
pTag
;
// used for stream partition
}
SDataBlockInfo
;
typedef
struct
SSDataBlock
{
...
...
@@ -239,22 +239,22 @@ typedef struct SVarColAttr {
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef
struct
SColumnInfoData
{
char
*
pData
;
// the corresponding block data in memory
char
*
pData
;
// the corresponding block data in memory
union
{
char
*
nullbitmap
;
// bitmap, one bit for each item in the list
SVarColAttr
varmeta
;
};
SColumnInfo
info
;
// column info
bool
hasNull
;
// if current column data has null value.
SColumnInfo
info
;
// column info
bool
hasNull
;
// if current column data has null value.
}
SColumnInfoData
;
typedef
struct
SQueryTableDataCond
{
uint64_t
suid
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
int32_t
*
pSlotList
;
// the column output destation slot, and it may be null
int32_t
type
;
// data block load type:
int32_t
*
pSlotList
;
// the column output destation slot, and it may be null
int32_t
type
;
// data block load type:
STimeWindow
twindows
;
int64_t
startVersion
;
int64_t
endVersion
;
...
...
@@ -300,6 +300,7 @@ typedef struct STableBlockDistInfo {
int32_t
firstSeekTimeUs
;
uint32_t
numOfInmemRows
;
uint32_t
numOfSmallBlocks
;
uint32_t
numOfVgroups
;
int32_t
blockRowsHisto
[
20
];
}
STableBlockDistInfo
;
...
...
source/libs/executor/src/sysscanoperator.c
浏览文件 @
2d85d019
...
...
@@ -81,10 +81,10 @@ typedef struct MergeIndex {
}
MergeIndex
;
typedef
struct
SBlockDistInfo
{
SSDataBlock
*
pResBlock
;
STsdbReader
*
pHandle
;
SReadHandle
readHandle
;
uint64_t
uid
;
// table uid
SSDataBlock
*
pResBlock
;
STsdbReader
*
pHandle
;
SReadHandle
readHandle
;
uint64_t
uid
;
// table uid
}
SBlockDistInfo
;
static
int32_t
sysChkFilter__Comm
(
SNode
*
pNode
);
...
...
@@ -129,10 +129,10 @@ static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time"
static
char
*
SYSTABLE_SPECIAL_COL
[]
=
{
"db_name"
,
"vgroup_id"
};
static
int32_t
buildSysDbTableInfo
(
const
SSysTableScanInfo
*
pInfo
,
int32_t
capacity
);
static
SSDataBlock
*
buildInfoSchemaTableMetaBlock
(
char
*
tableName
);
static
void
destroySysScanOperator
(
void
*
param
);
static
int32_t
loadSysTableCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
);
static
int32_t
buildSysDbTableInfo
(
const
SSysTableScanInfo
*
pInfo
,
int32_t
capacity
);
static
SSDataBlock
*
buildInfoSchemaTableMetaBlock
(
char
*
tableName
);
static
void
destroySysScanOperator
(
void
*
param
);
static
int32_t
loadSysTableCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
);
static
__optSysFilter
optSysGetFilterFunc
(
int32_t
ctype
,
bool
*
reverse
);
static
int32_t
sysTableUserTagsFillOneTableTags
(
const
SSysTableScanInfo
*
pInfo
,
SMetaReader
*
smrSuperTable
,
...
...
@@ -199,11 +199,11 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
if
(
func
==
NULL
)
return
-
1
;
SMetaFltParam
param
=
{.
suid
=
0
,
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
val
=
pVal
->
datum
.
p
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
val
=
pVal
->
datum
.
p
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
return
-
1
;
}
...
...
@@ -218,11 +218,11 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
if
(
func
==
NULL
)
return
-
1
;
SMetaFltParam
param
=
{.
suid
=
0
,
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_BIGINT
,
.
val
=
&
pVal
->
datum
.
i
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_BIGINT
,
.
val
=
&
pVal
->
datum
.
i
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
int32_t
ret
=
metaFilterCreateTime
(
pMeta
,
&
param
,
result
);
return
ret
;
...
...
@@ -350,9 +350,9 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static
SSDataBlock
*
sysTableScanFromMNode
(
SOperatorInfo
*
pOperator
,
SSysTableScanInfo
*
pInfo
,
const
char
*
name
,
SExecTaskInfo
*
pTaskInfo
);
void
extractTbnameSlotId
(
SSysTableScanInfo
*
pInfo
,
const
SScanPhysiNode
*
pScanNode
);
static
void
sysTableScanFillTbName
(
SOperatorInfo
*
pOperator
,
const
SSysTableScanInfo
*
pInfo
,
const
char
*
name
,
SSDataBlock
*
pBlock
);
__optSysFilter
optSysGetFilterFunc
(
int32_t
ctype
,
bool
*
reverse
)
{
static
void
sysTableScanFillTbName
(
SOperatorInfo
*
pOperator
,
const
SSysTableScanInfo
*
pInfo
,
const
char
*
name
,
SSDataBlock
*
pBlock
);
__optSysFilter
optSysGetFilterFunc
(
int32_t
ctype
,
bool
*
reverse
)
{
if
(
ctype
==
OP_TYPE_LOWER_EQUAL
||
ctype
==
OP_TYPE_LOWER_THAN
)
{
*
reverse
=
true
;
}
...
...
@@ -516,9 +516,10 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaTbCursorPrev
(
pInfo
->
pCur
);
blockFull
=
true
;
}
else
{
sysTableUserTagsFillOneTableTags
(
pInfo
,
&
smrSuperTable
,
&
pInfo
->
pCur
->
mr
,
dbname
,
tableName
,
&
numOfRows
,
dataBlock
);
sysTableUserTagsFillOneTableTags
(
pInfo
,
&
smrSuperTable
,
&
pInfo
->
pCur
->
mr
,
dbname
,
tableName
,
&
numOfRows
,
dataBlock
);
}
metaReaderClear
(
&
smrSuperTable
);
if
(
blockFull
||
numOfRows
>=
pOperator
->
resultInfo
.
capacity
)
{
...
...
@@ -1343,7 +1344,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
setOperatorCompleted
(
pOperator
);
}
return
pBlock
->
info
.
rows
>
0
?
pBlock
:
NULL
;
return
pBlock
->
info
.
rows
>
0
?
pBlock
:
NULL
;
}
else
{
return
NULL
;
}
...
...
@@ -1357,7 +1358,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
if
(
pInfo
->
tbnameSlotId
!=
-
1
)
{
SColumnInfoData
*
pColumnInfoData
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
tbnameSlotId
);
char
varTbName
[
TSDB_TABLE_FNAME_LEN
-
1
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
varTbName
[
TSDB_TABLE_FNAME_LEN
-
1
+
VARSTR_HEADER_SIZE
]
=
{
0
};
memcpy
(
varDataVal
(
varTbName
),
name
,
strlen
(
name
));
varDataSetLen
(
varTbName
,
strlen
(
name
));
...
...
@@ -1489,10 +1490,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
setOperatorInfo
(
pOperator
,
"SysTableScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doSysTableScan
,
NULL
,
destroySysScanOperator
,
optrDefaultBufFn
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doSysTableScan
,
NULL
,
destroySysScanOperator
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
_error:
if
(
pInfo
!=
NULL
)
{
destroySysScanOperator
(
pInfo
);
}
...
...
@@ -1927,7 +1929,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
// make the valgrind happy that all memory buffer has been initialized already.
if
(
slotId
!=
0
)
{
SColumnInfoData
*
p1
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
int64_t
v
=
0
;
int64_t
v
=
0
;
colDataAppendInt64
(
p1
,
0
,
&
v
);
}
...
...
@@ -1937,10 +1939,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
}
static
void
destroyBlockDistScanOperatorInfo
(
void
*
param
)
{
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
tsdbReaderClose
(
pDistInfo
->
pHandle
);
taosMemoryFreeClear
(
param
);
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
tsdbReaderClose
(
pDistInfo
->
pHandle
);
taosMemoryFreeClear
(
param
);
}
static
int32_t
initTableblockDistQueryCond
(
uint64_t
uid
,
SQueryTableDataCond
*
pCond
)
{
...
...
@@ -2001,7 +2003,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
}
pInfo
->
readHandle
=
*
readHandle
;
pInfo
->
uid
=
(
pBlockScanNode
->
suid
!=
0
)
?
pBlockScanNode
->
suid
:
pBlockScanNode
->
uid
;
pInfo
->
uid
=
(
pBlockScanNode
->
suid
!=
0
)
?
pBlockScanNode
->
suid
:
pBlockScanNode
->
uid
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pBlockScanNode
->
pScanPseudoCols
,
NULL
,
&
numOfCols
);
...
...
@@ -2012,8 +2014,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
setOperatorInfo
(
pOperator
,
"DataBlockDistScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doBlockInfoScan
,
NULL
,
destroyBlockDistScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doBlockInfoScan
,
NULL
,
destroyBlockDistScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
2d85d019
...
...
@@ -775,13 +775,13 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
static
int32_t
setNullSelectivityValue
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
);
static
int32_t
setSelectivityValue
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
const
STuplePos
*
pTuplePos
,
int32_t
rowIndex
);
int32_t
rowIndex
);
int32_t
minmaxFunctionFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
pCtx
);
SMinmaxResInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
SMinmaxResInfo
*
pRes
=
GET_ROWCELL_INTERBUF
(
pEntryInfo
);
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
currentRow
=
pBlock
->
info
.
rows
;
...
...
@@ -795,7 +795,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
case
TSDB_DATA_TYPE_UBIGINT
:
case
TSDB_DATA_TYPE_BIGINT
:
((
int64_t
*
)
pCol
->
pData
)[
currentRow
]
=
pRes
->
v
;
// colDataAppendInt64(pCol, currentRow, &pRes->v);
// colDataAppendInt64(pCol, currentRow, &pRes->v);
break
;
case
TSDB_DATA_TYPE_UINT
:
case
TSDB_DATA_TYPE_INT
:
...
...
@@ -1691,7 +1691,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
int32_t
percentileFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SVariant
*
pVal
=
&
pCtx
->
param
[
1
].
param
;
int32_t
code
=
0
;
int32_t
code
=
0
;
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pVal
->
nType
,
&
pVal
->
i
);
...
...
@@ -2070,7 +2070,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
}
static
int32_t
firstlastSaveTupleData
(
const
SSDataBlock
*
pSrcBlock
,
int32_t
rowIndex
,
SqlFunctionCtx
*
pCtx
,
SFirstLastRes
*
pInfo
)
{
SFirstLastRes
*
pInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pCtx
->
subsidiaries
.
num
<=
0
)
{
...
...
@@ -2123,7 +2123,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
}
// All null data column, return directly.
if
(
pInput
->
colDataSMAIsSet
&&
(
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
==
pInput
->
totalRows
)
&&
pInputCol
->
hasNull
==
true
)
{
if
(
pInput
->
colDataSMAIsSet
&&
(
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
==
pInput
->
totalRows
)
&&
pInputCol
->
hasNull
==
true
)
{
// save selectivity value for column consisted of all null values
int32_t
code
=
firstlastSaveTupleData
(
pCtx
->
pSrcBlock
,
pInput
->
startRowIndex
,
pCtx
,
pInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2239,7 +2240,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
// All null data column, return directly.
if
(
pInput
->
colDataSMAIsSet
&&
(
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
==
pInput
->
totalRows
)
&&
pInputCol
->
hasNull
==
true
)
{
if
(
pInput
->
colDataSMAIsSet
&&
(
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
==
pInput
->
totalRows
)
&&
pInputCol
->
hasNull
==
true
)
{
// save selectivity value for column consisted of all null values
int32_t
code
=
firstlastSaveTupleData
(
pCtx
->
pSrcBlock
,
pInput
->
startRowIndex
,
pCtx
,
pInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2333,7 +2335,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
if
(
pResInfo
->
numOfRes
==
0
||
pInfo
->
ts
<
cts
)
{
char
*
data
=
colDataGetData
(
pInputCol
,
chosen
);
char
*
data
=
colDataGetData
(
pInputCol
,
chosen
);
int32_t
code
=
doSaveCurrentVal
(
pCtx
,
i
,
cts
,
type
,
data
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -2344,7 +2346,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
for
(
int32_t
i
=
pInput
->
startRowIndex
+
round
*
4
;
i
<
pInput
->
startRowIndex
+
pInput
->
numOfRows
;
++
i
)
{
if
(
pResInfo
->
numOfRes
==
0
||
pInfo
->
ts
<
pts
[
i
])
{
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
int32_t
code
=
doSaveCurrentVal
(
pCtx
,
i
,
pts
[
i
],
type
,
data
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -2361,7 +2363,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
numOfElems
++
;
if
(
pResInfo
->
numOfRes
==
0
||
pInfo
->
ts
<
pts
[
i
])
{
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
int32_t
code
=
doSaveCurrentVal
(
pCtx
,
i
,
pts
[
i
],
type
,
data
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -2408,7 +2410,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
}
static
int32_t
firstLastTransferInfo
(
SqlFunctionCtx
*
pCtx
,
SFirstLastRes
*
pInput
,
SFirstLastRes
*
pOutput
,
bool
isFirst
,
int32_t
rowIndex
)
{
int32_t
rowIndex
)
{
if
(
TSDB_CODE_SUCCESS
==
firstLastTransferInfoImpl
(
pInput
,
pOutput
,
isFirst
))
{
int32_t
code
=
firstlastSaveTupleData
(
pCtx
->
pSrcBlock
,
rowIndex
,
pCtx
,
pOutput
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2435,7 +2437,7 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
for
(
int32_t
i
=
start
;
i
<
start
+
pInput
->
numOfRows
;
++
i
)
{
char
*
data
=
colDataGetData
(
pCol
,
i
);
SFirstLastRes
*
pInputInfo
=
(
SFirstLastRes
*
)
varDataVal
(
data
);
int32_t
code
=
firstLastTransferInfo
(
pCtx
,
pInputInfo
,
pInfo
,
isFirstQuery
,
i
);
int32_t
code
=
firstLastTransferInfo
(
pCtx
,
pInputInfo
,
pInfo
,
isFirstQuery
,
i
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -2667,7 +2669,7 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
}
static
int32_t
doHandleDiff
(
SDiffInfo
*
pDiffInfo
,
int32_t
type
,
const
char
*
pv
,
SColumnInfoData
*
pOutput
,
int32_t
pos
,
int32_t
order
,
int64_t
ts
)
{
int32_t
order
,
int64_t
ts
)
{
int32_t
factor
=
(
order
==
TSDB_ORDER_ASC
)
?
1
:
-
1
;
pDiffInfo
->
prevTs
=
ts
;
switch
(
type
)
{
...
...
@@ -2875,8 +2877,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
return
pRes
;
}
static
int32_t
doAddIntoResult
(
SqlFunctionCtx
*
pCtx
,
void
*
pData
,
int32_t
rowIndex
,
SSDataBlock
*
pSrcBlock
,
uint16_t
type
,
uint64_t
uid
,
SResultRowEntryInfo
*
pEntryInfo
,
bool
isTopQuery
);
static
int32_t
doAddIntoResult
(
SqlFunctionCtx
*
pCtx
,
void
*
pData
,
int32_t
rowIndex
,
SSDataBlock
*
pSrcBlock
,
uint
16_t
type
,
uint
64_t
uid
,
SResultRowEntryInfo
*
pEntryInfo
,
bool
isTopQuery
);
static
void
addResult
(
SqlFunctionCtx
*
pCtx
,
STopBotResItem
*
pSourceItem
,
int16_t
type
,
bool
isTopQuery
);
...
...
@@ -2897,7 +2899,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
}
numOfElems
++
;
char
*
data
=
colDataGetData
(
pCol
,
i
);
char
*
data
=
colDataGetData
(
pCol
,
i
);
int32_t
code
=
doAddIntoResult
(
pCtx
,
data
,
i
,
pCtx
->
pSrcBlock
,
pRes
->
type
,
pInput
->
uid
,
pResInfo
,
true
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -2931,7 +2933,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
}
numOfElems
++
;
char
*
data
=
colDataGetData
(
pCol
,
i
);
char
*
data
=
colDataGetData
(
pCol
,
i
);
int32_t
code
=
doAddIntoResult
(
pCtx
,
data
,
i
,
pCtx
->
pSrcBlock
,
pRes
->
type
,
pInput
->
uid
,
pResInfo
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -2983,7 +2985,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
}
int32_t
doAddIntoResult
(
SqlFunctionCtx
*
pCtx
,
void
*
pData
,
int32_t
rowIndex
,
SSDataBlock
*
pSrcBlock
,
uint16_t
type
,
uint64_t
uid
,
SResultRowEntryInfo
*
pEntryInfo
,
bool
isTopQuery
)
{
uint64_t
uid
,
SResultRowEntryInfo
*
pEntryInfo
,
bool
isTopQuery
)
{
STopBotRes
*
pRes
=
getTopBotOutputInfo
(
pCtx
);
SVariant
val
=
{
0
};
...
...
@@ -3174,7 +3176,7 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo
if
(
pPage
==
NULL
)
{
return
NULL
;
}
char
*
p
=
pPage
->
data
+
pPos
->
offset
;
char
*
p
=
pPage
->
data
+
pPos
->
offset
;
releaseBufPage
(
pHandle
->
pBuf
,
pPage
);
return
p
;
}
else
{
...
...
@@ -4635,7 +4637,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
continue
;
}
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
int32_t
code
=
doReservoirSample
(
pCtx
,
pInfo
,
data
,
i
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -4655,7 +4657,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
}
int32_t
sampleFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
pCtx
);
SSampleInfo
*
pInfo
=
getSampleOutputInfo
(
pCtx
);
...
...
@@ -4989,7 +4991,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
}
numOfElems
++
;
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
int32_t
code
=
doModeAdd
(
pInfo
,
i
,
pCtx
,
data
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -5410,6 +5412,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
if
(
pDistInfo
->
maxRows
<
p1
.
maxRows
)
{
pDistInfo
->
maxRows
=
p1
.
maxRows
;
}
pDistInfo
->
numOfVgroups
+=
(
p1
.
numOfTables
!=
0
?
1
:
0
);
for
(
int32_t
i
=
0
;
i
<
tListLen
(
pDistInfo
->
blockRowsHisto
);
++
i
)
{
pDistInfo
->
blockRowsHisto
[
i
]
+=
p1
.
blockRowsHisto
[
i
];
...
...
@@ -5438,6 +5441,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
defMinRows
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pInfo
->
numOfInmemRows
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pInfo
->
numOfSmallBlocks
)
<
0
)
return
-
1
;
if
(
tEncodeU32
(
&
encoder
,
pInfo
->
numOfVgroups
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tListLen
(
pInfo
->
blockRowsHisto
);
++
i
)
{
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
blockRowsHisto
[
i
])
<
0
)
return
-
1
;
...
...
@@ -5469,6 +5473,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
if
(
tDecodeI32
(
&
decoder
,
&
pInfo
->
defMinRows
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pInfo
->
numOfInmemRows
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pInfo
->
numOfSmallBlocks
)
<
0
)
return
-
1
;
if
(
tDecodeU32
(
&
decoder
,
&
pInfo
->
numOfVgroups
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
tListLen
(
pInfo
->
blockRowsHisto
);
++
i
)
{
if
(
tDecodeI32
(
&
decoder
,
&
pInfo
->
blockRowsHisto
[
i
])
<
0
)
return
-
1
;
...
...
@@ -5520,7 +5525,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend
(
pColInfo
,
row
++
,
st
,
false
);
len
=
sprintf
(
st
+
VARSTR_HEADER_SIZE
,
"Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]"
,
pData
->
numOfTables
,
pData
->
numOfFiles
,
0
);
pData
->
numOfFiles
,
pData
->
numOfVgroups
);
varDataSetLen
(
st
,
len
);
colDataAppend
(
pColInfo
,
row
++
,
st
,
false
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录