Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c5812127
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
c5812127
编写于
8月 28, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
8月 28, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16439 from taosdata/feature/3_liaohj
feature(query): support last function cache and retrieve data.
上级
e0748f43
3c6aea81
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
195 addition
and
194 deletion
+195
-194
include/libs/function/function.h
include/libs/function/function.h
+1
-2
include/util/tpagedbuf.h
include/util/tpagedbuf.h
+1
-9
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+7
-5
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+54
-26
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+17
-15
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+25
-18
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+0
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+5
-13
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+1
-1
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+12
-7
source/libs/function/inc/tpercentile.h
source/libs/function/inc/tpercentile.h
+14
-14
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-1
source/libs/function/src/tpercentile.c
source/libs/function/src/tpercentile.c
+25
-9
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+31
-72
未找到文件。
include/libs/function/function.h
浏览文件 @
c5812127
...
...
@@ -139,9 +139,8 @@ typedef struct SqlFunctionCtx {
struct
SExprInfo
*
pExpr
;
struct
SDiskbasedBuf
*
pBuf
;
struct
SSDataBlock
*
pSrcBlock
;
struct
SSDataBlock
*
pDstBlock
;
// used by ind
i
finite rows function to set selectivity
struct
SSDataBlock
*
pDstBlock
;
// used by ind
e
finite rows function to set selectivity
int32_t
curBufPage
;
bool
increase
;
bool
isStream
;
char
udfName
[
TSDB_FUNC_NAME_LEN
];
...
...
include/util/tpagedbuf.h
浏览文件 @
c5812127
...
...
@@ -67,10 +67,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pBuf
* @param groupId
* @return
*/
SIDList
getDataBufPagesIdList
(
SDiskbasedBuf
*
pBuf
,
int32_t
groupId
);
SIDList
getDataBufPagesIdList
(
SDiskbasedBuf
*
pBuf
);
/**
* get the specified buffer page by id
...
...
@@ -101,13 +100,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi);
*/
size_t
getTotalBufSize
(
const
SDiskbasedBuf
*
pBuf
);
/**
* get the number of groups in the result buffer
* @param pBuf
* @return
*/
size_t
getNumOfBufGroupId
(
const
SDiskbasedBuf
*
pBuf
);
/**
* destroy result buffer
* @param pBuf
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
c5812127
...
...
@@ -128,8 +128,10 @@ typedef struct STsdbReader STsdbReader;
#define TIMEWINDOW_RANGE_CONTAINED 1
#define TIMEWINDOW_RANGE_EXTERNAL 2
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
#define CACHESCAN_RETRIEVE_TYPE_ALL 0x1
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8
int32_t
tsdbSetTableId
(
STsdbReader
*
pReader
,
int64_t
uid
);
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
SArray
*
pTableList
,
STsdbReader
**
ppReader
,
...
...
@@ -146,9 +148,9 @@ void *tsdbGetIdx(SMeta *pMeta);
void
*
tsdbGetIvtIdx
(
SMeta
*
pMeta
);
uint64_t
getReaderMaxVersion
(
STsdbReader
*
pReader
);
int32_t
tsdb
LastRow
ReaderOpen
(
void
*
pVnode
,
int32_t
type
,
SArray
*
pTableIdList
,
int32_t
numOfCols
,
void
**
pReader
);
int32_t
tsdbRetrieve
LastRow
(
void
*
pReader
,
SSDataBlock
*
pResBlock
,
const
int32_t
*
slotIds
,
SArray
*
pTableUids
);
int32_t
tsdb
Lastrow
ReaderClose
(
void
*
pReader
);
int32_t
tsdb
Cacherows
ReaderOpen
(
void
*
pVnode
,
int32_t
type
,
SArray
*
pTableIdList
,
int32_t
numOfCols
,
void
**
pReader
);
int32_t
tsdbRetrieve
CacheRows
(
void
*
pReader
,
SSDataBlock
*
pResBlock
,
const
int32_t
*
slotIds
,
SArray
*
pTableUids
);
int32_t
tsdb
Cacherows
ReaderClose
(
void
*
pReader
);
int32_t
tsdbGetTableSchema
(
SVnode
*
pVnode
,
int64_t
uid
,
STSchema
**
pSchema
,
int64_t
*
suid
);
void
tsdbCacheSetCapacity
(
SVnode
*
pVnode
,
size_t
capacity
);
...
...
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
浏览文件 @
c5812127
...
...
@@ -18,7 +18,7 @@
#include "tcommon.h"
#include "tsdb.h"
typedef
struct
S
Lastrow
Reader
{
typedef
struct
S
CacheRows
Reader
{
SVnode
*
pVnode
;
STSchema
*
pSchema
;
uint64_t
uid
;
...
...
@@ -27,9 +27,9 @@ typedef struct SLastrowReader {
int32_t
type
;
int32_t
tableIndex
;
// currently returned result tables
SArray
*
pTableList
;
// table id list
}
S
Lastrow
Reader
;
}
S
CacheRows
Reader
;
static
void
saveOneRow
(
STSRow
*
pRow
,
SSDataBlock
*
pBlock
,
S
Lastrow
Reader
*
pReader
,
const
int32_t
*
slotIds
)
{
static
void
saveOneRow
(
STSRow
*
pRow
,
SSDataBlock
*
pBlock
,
S
CacheRows
Reader
*
pReader
,
const
int32_t
*
slotIds
)
{
ASSERT
(
pReader
->
numOfCols
<=
taosArrayGetSize
(
pBlock
->
pDataBlock
));
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
...
...
@@ -61,8 +61,10 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade
pBlock
->
info
.
rows
+=
1
;
}
int32_t
tsdbLastRowReaderOpen
(
void
*
pVnode
,
int32_t
type
,
SArray
*
pTableIdList
,
int32_t
numOfCols
,
void
**
pReader
)
{
SLastrowReader
*
p
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastrowReader
));
int32_t
tsdbCacherowsReaderOpen
(
void
*
pVnode
,
int32_t
type
,
SArray
*
pTableIdList
,
int32_t
numOfCols
,
void
**
pReader
)
{
*
pReader
=
NULL
;
SCacheRowsReader
*
p
=
taosMemoryCalloc
(
1
,
sizeof
(
SCacheRowsReader
));
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -81,9 +83,17 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
p
->
pTableList
=
pTableIdList
;
p
->
transferBuf
=
taosMemoryCalloc
(
p
->
pSchema
->
numOfCols
,
POINTER_BYTES
);
if
(
p
->
transferBuf
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
p
->
pSchema
->
numOfCols
;
++
i
)
{
if
(
IS_VAR_DATA_TYPE
(
p
->
pSchema
->
columns
[
i
].
type
))
{
p
->
transferBuf
[
i
]
=
taosMemoryMalloc
(
p
->
pSchema
->
columns
[
i
].
bytes
);
if
(
p
->
transferBuf
[
i
]
==
NULL
)
{
tsdbCacherowsReaderClose
(
p
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
}
...
...
@@ -91,8 +101,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdb
Lastrow
ReaderClose
(
void
*
pReader
)
{
S
Lastrow
Reader
*
p
=
pReader
;
int32_t
tsdb
Cacherows
ReaderClose
(
void
*
pReader
)
{
S
CacheRows
Reader
*
p
=
pReader
;
if
(
p
->
pSchema
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
p
->
pSchema
->
numOfCols
;
++
i
)
{
...
...
@@ -107,28 +117,56 @@ int32_t tsdbLastrowReaderClose(void* pReader) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbRetrieveLastRow
(
void
*
pReader
,
SSDataBlock
*
pResBlock
,
const
int32_t
*
slotIds
,
SArray
*
pTableUidList
)
{
static
int32_t
doExtractCacheRow
(
SCacheRowsReader
*
pr
,
SLRUCache
*
lruCache
,
uint64_t
uid
,
STSRow
**
pRow
,
LRUHandle
**
h
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
pr
->
type
&
CACHESCAN_RETRIEVE_LAST_ROW
)
==
CACHESCAN_RETRIEVE_LAST_ROW
)
{
code
=
tsdbCacheGetLastrowH
(
lruCache
,
uid
,
pr
->
pVnode
->
pTsdb
,
h
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// no data in the table of Uid
if
(
*
h
!=
NULL
)
{
*
pRow
=
(
STSRow
*
)
taosLRUCacheValue
(
lruCache
,
*
h
);
}
}
else
{
code
=
tsdbCacheGetLastH
(
lruCache
,
uid
,
pr
->
pVnode
->
pTsdb
,
h
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// no data in the table of Uid
if
(
*
h
!=
NULL
)
{
SArray
*
pLast
=
(
SArray
*
)
taosLRUCacheValue
(
lruCache
,
*
h
);
tsdbCacheLastArray2Row
(
pLast
,
pRow
,
pr
->
pSchema
);
}
}
return
code
;
}
int32_t
tsdbRetrieveCacheRows
(
void
*
pReader
,
SSDataBlock
*
pResBlock
,
const
int32_t
*
slotIds
,
SArray
*
pTableUidList
)
{
if
(
pReader
==
NULL
||
pResBlock
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
}
S
Lastrow
Reader
*
pr
=
pReader
;
S
CacheRows
Reader
*
pr
=
pReader
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SLRUCache
*
lruCache
=
pr
->
pVnode
->
pTsdb
->
lruCache
;
LRUHandle
*
h
=
NULL
;
STSRow
*
pRow
=
NULL
;
size_t
numOfTables
=
taosArrayGetSize
(
pr
->
pTableList
);
// retrieve the only one last row of all tables in the uid list.
if
(
pr
->
type
==
LASTROW
_RETRIEVE_TYPE_SINGLE
)
{
if
(
(
pr
->
type
&
CACHESCAN_RETRIEVE_TYPE_SINGLE
)
==
CACHESCAN
_RETRIEVE_TYPE_SINGLE
)
{
int64_t
lastKey
=
INT64_MIN
;
bool
internalResult
=
false
;
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STableKeyInfo
*
pKeyInfo
=
taosArrayGet
(
pr
->
pTableList
,
i
);
int32_t
code
=
tsdbCacheGetLastrowH
(
lruCache
,
pKeyInfo
->
uid
,
pr
->
pVnode
->
pTsdb
,
&
h
);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
code
=
doExtractCacheRow
(
pr
,
lruCache
,
pKeyInfo
->
uid
,
&
pRow
,
&
h
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -136,9 +174,6 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
continue
;
}
pRow
=
(
STSRow
*
)
taosLRUCacheValue
(
lruCache
,
h
);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
if
(
pRow
->
ts
>
lastKey
)
{
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
...
...
@@ -155,25 +190,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
tsdbCacheRelease
(
lruCache
,
h
);
}
}
else
if
(
pr
->
type
==
LASTROW
_RETRIEVE_TYPE_ALL
)
{
}
else
if
(
(
pr
->
type
&
CACHESCAN_RETRIEVE_TYPE_ALL
)
==
CACHESCAN
_RETRIEVE_TYPE_ALL
)
{
for
(
int32_t
i
=
pr
->
tableIndex
;
i
<
numOfTables
;
++
i
)
{
STableKeyInfo
*
pKeyInfo
=
taosArrayGet
(
pr
->
pTableList
,
i
);
int32_t
code
=
tsdbCacheGetLastrowH
(
lruCache
,
pKeyInfo
->
uid
,
pr
->
pVnode
->
pTsdb
,
&
h
);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
code
=
doExtractCacheRow
(
pr
,
lruCache
,
pKeyInfo
->
uid
,
&
pRow
,
&
h
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// no data in the table of Uid
if
(
h
==
NULL
)
{
continue
;
}
pRow
=
(
STSRow
*
)
taosLRUCacheValue
(
lruCache
,
h
);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
saveOneRow
(
pRow
,
pResBlock
,
pr
,
slotIds
);
taosArrayPush
(
pTableUidList
,
&
pKeyInfo
->
uid
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
c5812127
...
...
@@ -16,9 +16,9 @@
#include "osDef.h"
#include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ASCENDING_TRAVERSE(o)
(o == TSDB_ORDER_ASC)
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define
DEFAULT
_ROW_INDEX_VAL (-1)
#define
INITIAL
_ROW_INDEX_VAL (-1)
typedef
enum
{
EXTERNAL_ROWS_PREV
=
0x1
,
...
...
@@ -234,7 +234,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
}
for
(
int32_t
j
=
0
;
j
<
numOfTables
;
++
j
)
{
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
idList
[
j
].
uid
,
.
indexInBlockL
=
DEFAULT
_ROW_INDEX_VAL
};
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
idList
[
j
].
uid
,
.
indexInBlockL
=
INITIAL
_ROW_INDEX_VAL
};
if
(
ASCENDING_TRAVERSE
(
pTsdbReader
->
order
))
{
if
(
info
.
lastKey
==
INT64_MIN
||
info
.
lastKey
<
pTsdbReader
->
window
.
skey
)
{
info
.
lastKey
=
pTsdbReader
->
window
.
skey
;
...
...
@@ -266,7 +266,9 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
p
->
iter
.
iter
=
tsdbTbDataIterDestroy
(
p
->
iter
.
iter
);
}
p
->
delSkyline
=
taosArrayDestroy
(
p
->
delSkyline
);
p
->
fileDelIndex
=
-
1
;
p
->
delSkyline
=
taosArrayDestroy
(
p
->
delSkyline
);
p
->
lastBlockDelIndex
=
INITIAL_ROW_INDEX_VAL
;
}
}
...
...
@@ -414,7 +416,7 @@ _err:
return
false
;
}
static
void
resetDataBlockIterator
(
SDataBlockIter
*
pIter
,
int32_t
order
,
SHashObj
*
pTableMap
)
{
static
void
resetDataBlockIterator
(
SDataBlockIter
*
pIter
,
int32_t
order
)
{
pIter
->
order
=
order
;
pIter
->
index
=
-
1
;
pIter
->
numOfBlocks
=
0
;
...
...
@@ -423,7 +425,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashOb
}
else
{
taosArrayClear
(
pIter
->
blockList
);
}
pIter
->
pTableMap
=
pTableMap
;
}
static
void
cleanupDataBlockIterator
(
SDataBlockIter
*
pIter
)
{
taosArrayDestroy
(
pIter
->
blockList
);
}
...
...
@@ -579,7 +580,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
}
// reset the index in last block when handing a new file
px
->
indexInBlockL
=
DEFAULT
_ROW_INDEX_VAL
;
px
->
indexInBlockL
=
INITIAL
_ROW_INDEX_VAL
;
tMapDataClear
(
&
px
->
mapData
);
taosArrayClear
(
px
->
pBlockList
);
}
...
...
@@ -887,6 +888,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter
->
numOfBlocks
=
numOfBlocks
;
taosArrayClear
(
pBlockIter
->
blockList
);
pBlockIter
->
pTableMap
=
pReader
->
status
.
pTableMap
;
// access data blocks according to the offset of each block in asc/desc order.
int32_t
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
...
...
@@ -2403,7 +2405,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
initLastBlockReader
(
pLastBlockReader
,
pScanInfo
->
uid
,
&
pScanInfo
->
indexInBlockL
);
int32_t
index
=
pScanInfo
->
indexInBlockL
;
if
(
index
==
DEFAULT
_ROW_INDEX_VAL
||
index
==
pLastBlockReader
->
lastBlockData
.
nRow
)
{
if
(
index
==
INITIAL
_ROW_INDEX_VAL
||
index
==
pLastBlockReader
->
lastBlockData
.
nRow
)
{
bool
hasData
=
nextRowInLastBlock
(
pLastBlockReader
,
pScanInfo
);
if
(
!
hasData
)
{
// current table does not have rows in last block, try next table
bool
hasNexTable
=
moveToNextTable
(
pOrderedCheckInfo
,
pStatus
);
...
...
@@ -2470,7 +2472,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// note: the lastblock may be null here
initLastBlockReader
(
pLastBlockReader
,
pScanInfo
->
uid
,
&
pScanInfo
->
indexInBlockL
);
if
(
pScanInfo
->
indexInBlockL
==
DEFAULT
_ROW_INDEX_VAL
||
pScanInfo
->
indexInBlockL
==
pLastBlockReader
->
lastBlockData
.
nRow
)
{
if
(
pScanInfo
->
indexInBlockL
==
INITIAL
_ROW_INDEX_VAL
||
pScanInfo
->
indexInBlockL
==
pLastBlockReader
->
lastBlockData
.
nRow
)
{
bool
hasData
=
nextRowInLastBlock
(
pLastBlockReader
,
pScanInfo
);
}
}
...
...
@@ -2582,7 +2584,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
code
=
initBlockIterator
(
pReader
,
pBlockIter
,
num
.
numOfBlocks
);
}
else
{
// no block data, only last block exists
tBlockDataReset
(
&
pReader
->
status
.
fileBlockData
);
resetDataBlockIterator
(
pBlockIter
,
pReader
->
order
,
pReader
->
status
.
pTableMap
);
resetDataBlockIterator
(
pBlockIter
,
pReader
->
order
);
}
SLastBlockReader
*
pLReader
=
pReader
->
status
.
fileIter
.
pLastBlockReader
;
...
...
@@ -2654,7 +2656,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
initBlockDumpInfo
(
pReader
,
pBlockIter
);
}
else
if
(
taosArrayGetSize
(
pReader
->
status
.
fileIter
.
pLastBlockReader
->
pBlockL
)
>
0
)
{
// data blocks in current file are exhausted, let's try the next file now
tBlockDataReset
(
&
pReader
->
status
.
fileBlockData
);
resetDataBlockIterator
(
pBlockIter
,
pReader
->
order
,
pReader
->
status
.
pTableMap
);
resetDataBlockIterator
(
pBlockIter
,
pReader
->
order
);
goto
_begin
;
}
else
{
code
=
initForFirstBlockInFile
(
pReader
,
pBlockIter
);
...
...
@@ -3276,7 +3278,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
ASSERT
(
pReader
!=
NULL
);
taosHashClear
(
pReader
->
status
.
pTableMap
);
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
uid
,
.
indexInBlockL
=
DEFAULT
_ROW_INDEX_VAL
};
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
uid
,
.
indexInBlockL
=
INITIAL
_ROW_INDEX_VAL
};
taosHashPut
(
pReader
->
status
.
pTableMap
,
&
info
.
uid
,
sizeof
(
uint64_t
),
&
info
,
sizeof
(
info
));
return
TDB_CODE_SUCCESS
;
}
...
...
@@ -3372,7 +3374,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pReader
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
,
pReader
->
status
.
pTableMap
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
// no data in files, let's try buffer in memory
if
(
pReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
...
...
@@ -3393,7 +3395,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
}
initFilesetIterator
(
&
pPrevReader
->
status
.
fileIter
,
pPrevReader
->
pReadSnap
->
fs
.
aDFileSet
,
pPrevReader
);
resetDataBlockIterator
(
&
pPrevReader
->
status
.
blockIter
,
pPrevReader
->
order
,
pReader
->
status
.
pTableMap
);
resetDataBlockIterator
(
&
pPrevReader
->
status
.
blockIter
,
pPrevReader
->
order
);
// no data in files, let's try buffer in memory
if
(
pPrevReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
...
...
@@ -3696,7 +3698,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pReader
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
,
pReader
->
status
.
pTableMap
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
resetDataBlockScanInfo
(
pReader
->
status
.
pTableMap
);
int32_t
code
=
0
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
c5812127
...
...
@@ -909,7 +909,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SProjectPhysiNode
*
pProjPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSortNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMultiwayMergeOperatorInfo
(
SOperatorInfo
**
dowStreams
,
size_t
numStreams
,
SMergePhysiNode
*
pMergePhysiNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
create
Lastrow
ScanOperator
(
SLastRowScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
create
Cacherows
ScanOperator
(
SLastRowScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
c5812127
...
...
@@ -25,24 +25,27 @@
#include "thash.h"
#include "ttypes.h"
static
SSDataBlock
*
doScan
Lastrow
(
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doScan
Cache
(
SOperatorInfo
*
pOperator
);
static
void
destroyLastrowScanOperator
(
void
*
param
);
static
int32_t
extractTargetSlotId
(
const
SArray
*
pColMatchInfo
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
**
pSlotIds
);
SOperatorInfo
*
createLastrowScanOperator
(
SLastRowScanPhysiNode
*
pScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createCacherowsScanOperator
(
SLastRowScanPhysiNode
*
pScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SLastrowScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SLastrowScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
pInfo
->
readHandle
=
*
readHandle
;
pInfo
->
pRes
=
createResDataBlock
(
pScanNode
->
scan
.
node
.
pOutputDataBlockDesc
);
pInfo
->
pRes
=
createResDataBlock
(
pScanNode
->
scan
.
node
.
pOutputDataBlockDesc
);
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanNode
->
scan
.
pScanCols
,
pScanNode
->
scan
.
node
.
pOutputDataBlockDesc
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
extractTargetSlotId
(
pInfo
->
pColMatchInfo
,
pTaskInfo
,
&
pInfo
->
pSlotIds
);
code
=
extractTargetSlotId
(
pInfo
->
pColMatchInfo
,
pTaskInfo
,
&
pInfo
->
pSlotIds
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -55,13 +58,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
// partition by tbname
if
(
taosArrayGetSize
(
pTableList
->
pGroupList
)
==
taosArrayGetSize
(
pTableList
->
pTableList
))
{
pInfo
->
retrieveType
=
LASTROW_RETRIEVE_TYPE_ALL
;
tsdbLastRowReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pTableList
->
pTableList
,
taosArrayGetSize
(
pInfo
->
pColMatchInfo
),
&
pInfo
->
pLastrowReader
);
pInfo
->
retrieveType
=
CACHESCAN_RETRIEVE_TYPE_ALL
|
CACHESCAN_RETRIEVE_LAST_ROW
;
code
=
tsdbCacherowsReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pTableList
->
pTableList
,
taosArrayGetSize
(
pInfo
->
pColMatchInfo
),
&
pInfo
->
pLastrowReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
pBufferredRes
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pBufferredRes
,
pOperator
->
resultInfo
.
capacity
);
}
else
{
// by tags
pInfo
->
retrieveType
=
LASTROW_RETRIEVE_TYPE_SINGLE
;
pInfo
->
retrieveType
=
CACHESCAN_RETRIEVE_TYPE_SINGLE
|
CACHESCAN_RETRIEVE_LAST_ROW
;
}
if
(
pScanNode
->
scan
.
pScanPseudoCols
!=
NULL
)
{
...
...
@@ -80,19 +87,19 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doScan
Lastrow
,
NULL
,
NULL
,
destroyLastrowScanOperator
,
NULL
,
NULL
,
NULL
);
createOperatorFpSet
(
operatorDummyOpenFn
,
doScan
Cache
,
NULL
,
NULL
,
destroyLastrowScanOperator
,
NULL
,
NULL
,
NULL
);
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pInfo
);
pTaskInfo
->
code
=
code
;
destroyLastrowScanOperator
(
pInfo
);
taosMemoryFree
(
pOperator
);
return
NULL
;
}
SSDataBlock
*
doScan
Lastrow
(
SOperatorInfo
*
pOperator
)
{
SSDataBlock
*
doScan
Cache
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
...
...
@@ -109,14 +116,14 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
blockDataCleanup
(
pInfo
->
pRes
);
// check if it is a group by tbname
if
(
pInfo
->
retrieveType
==
LASTROW
_RETRIEVE_TYPE_ALL
)
{
if
(
(
pInfo
->
retrieveType
&
CACHESCAN_RETRIEVE_TYPE_ALL
)
==
CACHESCAN
_RETRIEVE_TYPE_ALL
)
{
if
(
pInfo
->
indexOfBufferedRes
>=
pInfo
->
pBufferredRes
->
info
.
rows
)
{
blockDataCleanup
(
pInfo
->
pBufferredRes
);
taosArrayClear
(
pInfo
->
pUidList
);
int32_t
code
=
tsdbRetrieve
LastRow
(
pInfo
->
pLastrowReader
,
pInfo
->
pBufferredRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
int32_t
code
=
tsdbRetrieve
CacheRows
(
pInfo
->
pLastrowReader
,
pInfo
->
pBufferredRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
// check for tag values
...
...
@@ -172,11 +179,11 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
while
(
pInfo
->
currentGroupIndex
<
totalGroups
)
{
SArray
*
pGroupTableList
=
taosArrayGetP
(
pTableList
->
pGroupList
,
pInfo
->
currentGroupIndex
);
tsdb
LastRow
ReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pGroupTableList
,
tsdb
Cacherows
ReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pGroupTableList
,
taosArrayGetSize
(
pInfo
->
pColMatchInfo
),
&
pInfo
->
pLastrowReader
);
taosArrayClear
(
pInfo
->
pUidList
);
int32_t
code
=
tsdbRetrieve
LastRow
(
pInfo
->
pLastrowReader
,
pInfo
->
pRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
int32_t
code
=
tsdbRetrieve
CacheRows
(
pInfo
->
pLastrowReader
,
pInfo
->
pRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -200,7 +207,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
}
}
tsdb
Lastrow
ReaderClose
(
pInfo
->
pLastrowReader
);
tsdb
Cacherows
ReaderClose
(
pInfo
->
pLastrowReader
);
return
pInfo
->
pRes
;
}
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
c5812127
...
...
@@ -1217,7 +1217,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx
->
start
.
key
=
INT64_MIN
;
pCtx
->
end
.
key
=
INT64_MIN
;
pCtx
->
numOfParams
=
pExpr
->
base
.
numOfParams
;
pCtx
->
increase
=
false
;
pCtx
->
isStream
=
false
;
pCtx
->
param
=
pFunct
->
pParam
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
c5812127
...
...
@@ -184,7 +184,7 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int
// in the first scan, new space needed for results
int32_t
pageId
=
-
1
;
SIDList
list
=
getDataBufPagesIdList
(
pResultBuf
,
tableGroupId
);
SIDList
list
=
getDataBufPagesIdList
(
pResultBuf
);
if
(
taosArrayGetSize
(
list
)
==
0
)
{
pData
=
getNewBufPage
(
pResultBuf
,
tableGroupId
,
&
pageId
);
...
...
@@ -299,7 +299,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
// in the first scan, new space needed for results
int32_t
pageId
=
-
1
;
SIDList
list
=
getDataBufPagesIdList
(
pResultBuf
,
tid
);
SIDList
list
=
getDataBufPagesIdList
(
pResultBuf
);
if
(
taosArrayGetSize
(
list
)
==
0
)
{
pData
=
getNewBufPage
(
pResultBuf
,
tid
,
&
pageId
);
...
...
@@ -1565,16 +1565,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
if
(
pCtx
[
j
].
increase
)
{
int64_t
ts
=
*
(
int64_t
*
)
in
;
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
(
const
char
*
)
&
ts
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
ts
++
;
}
}
else
{
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
}
}
...
...
@@ -4137,7 +4129,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return
NULL
;
}
pOperator
=
create
Lastrow
ScanOperator
(
pScanNode
,
pHandle
,
pTaskInfo
);
pOperator
=
create
Cacherows
ScanOperator
(
pScanNode
,
pHandle
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_PROJECT
==
type
)
{
pOperator
=
createProjectOperatorInfo
(
NULL
,
(
SProjectPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
c5812127
...
...
@@ -1830,7 +1830,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
void
increaseTs
(
SqlFunctionCtx
*
pCtx
)
{
if
(
pCtx
[
0
].
pExpr
->
pExpr
->
_function
.
pFunctNode
->
funcType
==
FUNCTION_TYPE_WSTART
)
{
pCtx
[
0
].
increase
=
true
;
//
pCtx[0].increase = true;
}
}
...
...
source/libs/executor/src/tsort.c
浏览文件 @
c5812127
...
...
@@ -97,7 +97,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
return
pSortHandle
;
}
static
int32_t
sortComparClea
r
up
(
SMsortComparParam
*
cmpParam
)
{
static
int32_t
sortComparClea
n
up
(
SMsortComparParam
*
cmpParam
)
{
for
(
int32_t
i
=
0
;
i
<
cmpParam
->
numOfSources
;
++
i
)
{
SSortSource
*
pSource
=
cmpParam
->
pSources
[
i
];
// NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
blockDataDestroy
(
pSource
->
src
.
pBlock
);
...
...
@@ -134,15 +134,14 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
doAddNewExternalMemSource
(
SDiskbasedBuf
*
pBuf
,
SArray
*
pAllSources
,
SSDataBlock
*
pBlock
,
int32_t
*
sourceId
)
{
static
int32_t
doAddNewExternalMemSource
(
SDiskbasedBuf
*
pBuf
,
SArray
*
pAllSources
,
SSDataBlock
*
pBlock
,
int32_t
*
sourceId
,
SArray
*
pPageIdList
)
{
SSortSource
*
pSource
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortSource
));
if
(
pSource
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
pSource
->
pageIdList
=
getDataBufPagesIdList
(
pBuf
,
(
*
sourceId
));
pSource
->
src
.
pBlock
=
pBlock
;
pSource
->
pageIdList
=
pPageIdList
;
taosArrayPush
(
pAllSources
,
&
pSource
);
(
*
sourceId
)
+=
1
;
...
...
@@ -171,6 +170,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
}
}
SArray
*
pPageIdList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
while
(
start
<
pDataBlock
->
info
.
rows
)
{
int32_t
stop
=
0
;
blockDataSplitRows
(
pDataBlock
,
pDataBlock
->
info
.
hasVarCol
,
start
,
&
stop
,
pHandle
->
pageSize
);
...
...
@@ -186,6 +186,8 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return
terrno
;
}
taosArrayPush
(
pPageIdList
,
&
pageId
);
int32_t
size
=
blockDataGetSize
(
p
)
+
sizeof
(
int32_t
)
+
taosArrayGetSize
(
p
->
pDataBlock
)
*
sizeof
(
int32_t
);
assert
(
size
<=
getBufPageSize
(
pHandle
->
pBuf
));
...
...
@@ -201,7 +203,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup
(
pDataBlock
);
SSDataBlock
*
pBlock
=
createOneDataBlock
(
pDataBlock
,
false
);
return
doAddNewExternalMemSource
(
pHandle
->
pBuf
,
pHandle
->
pOrderedSource
,
pBlock
,
&
pHandle
->
sourceId
);
return
doAddNewExternalMemSource
(
pHandle
->
pBuf
,
pHandle
->
pOrderedSource
,
pBlock
,
&
pHandle
->
sourceId
,
pPageIdList
);
}
static
void
setCurrentSourceIsDone
(
SSortSource
*
pSource
,
SSortHandle
*
pHandle
)
{
...
...
@@ -502,6 +504,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return
code
;
}
SArray
*
pPageIdList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
getSortedBlockDataInner
(
pHandle
,
&
pHandle
->
cmpParam
,
numOfRows
);
if
(
pDataBlock
==
NULL
)
{
...
...
@@ -514,6 +517,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return
terrno
;
}
taosArrayPush
(
pPageIdList
,
&
pageId
);
int32_t
size
=
blockDataGetSize
(
pDataBlock
)
+
sizeof
(
int32_t
)
+
taosArrayGetSize
(
pDataBlock
->
pDataBlock
)
*
sizeof
(
int32_t
);
assert
(
size
<=
getBufPageSize
(
pHandle
->
pBuf
));
...
...
@@ -525,12 +530,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
blockDataCleanup
(
pDataBlock
);
}
sortComparClea
r
up
(
&
pHandle
->
cmpParam
);
sortComparClea
n
up
(
&
pHandle
->
cmpParam
);
tMergeTreeDestroy
(
pHandle
->
pMergeTree
);
pHandle
->
numOfCompletedSources
=
0
;
SSDataBlock
*
pBlock
=
createOneDataBlock
(
pHandle
->
pDataBlock
,
false
);
code
=
doAddNewExternalMemSource
(
pHandle
->
pBuf
,
pResList
,
pBlock
,
&
pHandle
->
sourceId
);
code
=
doAddNewExternalMemSource
(
pHandle
->
pBuf
,
pResList
,
pBlock
,
&
pHandle
->
sourceId
,
pPageIdList
);
if
(
code
!=
0
)
{
return
code
;
}
...
...
source/libs/function/inc/tpercentile.h
浏览文件 @
c5812127
...
...
@@ -51,20 +51,20 @@ struct tMemBucket;
typedef
int32_t
(
*
__perc_hash_func_t
)(
struct
tMemBucket
*
pBucket
,
const
void
*
value
);
typedef
struct
tMemBucket
{
int16_t
numOfSlots
;
int16_t
type
;
int16_t
bytes
;
int32_t
total
;
int32_t
elemPerPage
;
// number of elements for each object
int32_t
maxCapacity
;
// maximum allowed number of elements that can be sort directly to get the result
int32_t
bufPageSize
;
// disk page size
MinMaxEntry
range
;
// value range
int32_t
times
;
// count that has been checked for deciding the correct data value buckets.
__compar_fn_t
comparFn
;
tMemBucketSlot
*
pSlots
;
SDiskbasedBuf
*
pBuffer
;
__perc_hash_func_t
hashFunc
;
int16_t
numOfSlots
;
int16_t
type
;
int16_t
bytes
;
int32_t
total
;
int32_t
elemPerPage
;
// number of elements for each object
int32_t
maxCapacity
;
// maximum allowed number of elements that can be sort directly to get the result
int32_t
bufPageSize
;
// disk page size
MinMaxEntry
range
;
// value range
int32_t
times
;
// count that has been checked for deciding the correct data value buckets.
__compar_fn_t
comparFn
;
tMemBucketSlot
*
pSlots
;
SDiskbasedBuf
*
pBuffer
;
__perc_hash_func_t
hashFunc
;
SHashObj
*
groupPagesMap
;
// disk page map for different groups
;
}
tMemBucket
;
tMemBucket
*
tMemBucketCreate
(
int16_t
nElemSize
,
int16_t
dataType
,
double
minval
,
double
maxval
);
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
c5812127
...
...
@@ -3643,7 +3643,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
pCtx
);
STopBotRes
*
pRes
=
getTopBotOutputInfo
(
pCtx
);
int16_t
type
=
pCtx
->
input
.
pData
[
0
]
->
info
.
type
;
int16_t
type
=
pCtx
->
pExpr
->
base
.
resSchema
.
type
;
int32_t
slotId
=
pCtx
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
...
...
source/libs/function/src/tpercentile.c
浏览文件 @
c5812127
...
...
@@ -33,13 +33,13 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
SFilePage
*
buffer
=
(
SFilePage
*
)
taosMemoryCalloc
(
1
,
pMemBucket
->
bytes
*
pMemBucket
->
pSlots
[
slotIdx
].
info
.
size
+
sizeof
(
SFilePage
));
int32_t
groupId
=
getGroupId
(
pMemBucket
->
numOfSlots
,
slotIdx
,
pMemBucket
->
times
);
S
IDList
list
=
getDataBufPagesIdList
(
pMemBucket
->
pBuffer
,
groupId
);
S
Array
*
pIdList
=
*
(
SArray
**
)
taosHashGet
(
pMemBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
)
);
int32_t
offset
=
0
;
for
(
int32_t
i
=
0
;
i
<
list
->
size
;
++
i
)
{
struct
SPageInfo
*
pgInfo
=
*
(
struct
SPageInfo
**
)
taosArrayGet
(
l
ist
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pIdList
)
;
++
i
)
{
int32_t
*
pageId
=
taosArrayGet
(
pIdL
ist
,
i
);
SFilePage
*
pg
=
getBufPage
(
pMemBucket
->
pBuffer
,
getPageId
(
pgInfo
)
);
SFilePage
*
pg
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
memcpy
(
buffer
->
data
+
offset
,
pg
->
data
,
(
size_t
)(
pg
->
num
*
pMemBucket
->
bytes
));
offset
+=
(
int32_t
)(
pg
->
num
*
pMemBucket
->
bytes
);
...
...
@@ -97,11 +97,11 @@ double findOnlyResult(tMemBucket *pMemBucket) {
}
int32_t
groupId
=
getGroupId
(
pMemBucket
->
numOfSlots
,
i
,
pMemBucket
->
times
);
S
IDList
list
=
getDataBufPagesIdList
(
pMemBucket
->
pBuffer
,
groupId
);
S
Array
*
list
=
*
(
SArray
**
)
taosHashGet
(
pMemBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
)
);
assert
(
list
->
size
==
1
);
struct
SPageInfo
*
pgInfo
=
(
struct
SPageInfo
*
)
taosArrayGetP
(
list
,
0
);
SFilePage
*
pPage
=
getBufPage
(
pMemBucket
->
pBuffer
,
getPageId
(
pgInfo
)
);
int32_t
*
pageId
=
taosArrayGet
(
list
,
0
);
SFilePage
*
pPage
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
assert
(
pPage
->
num
==
1
);
double
v
=
0
;
...
...
@@ -233,7 +233,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
pBucket
->
times
=
1
;
pBucket
->
maxCapacity
=
200000
;
pBucket
->
groupPagesMap
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
setBoundingBox
(
&
pBucket
->
range
,
pBucket
->
type
,
minval
,
maxval
)
!=
0
)
{
// qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval);
taosMemoryFree
(
pBucket
);
...
...
@@ -280,8 +280,16 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
return
;
}
void
*
p
=
taosHashIterate
(
pBucket
->
groupPagesMap
,
NULL
);
while
(
p
)
{
SArray
**
p1
=
p
;
p
=
taosHashIterate
(
pBucket
->
groupPagesMap
,
p
);
taosArrayDestroy
(
*
p1
);
}
destroyDiskbasedBuf
(
pBucket
->
pBuffer
);
taosMemoryFreeClear
(
pBucket
->
pSlots
);
taosHashCleanup
(
pBucket
->
groupPagesMap
);
taosMemoryFreeClear
(
pBucket
);
}
...
...
@@ -357,8 +365,16 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot
->
info
.
data
=
NULL
;
}
SArray
*
pPageIdList
=
(
SArray
*
)
taosHashGet
(
pBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
));
if
(
pPageIdList
==
NULL
)
{
SArray
*
pList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
taosHashPut
(
pBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
),
&
pList
,
POINTER_BYTES
);
pPageIdList
=
pList
;
}
pSlot
->
info
.
data
=
getNewBufPage
(
pBucket
->
pBuffer
,
groupId
,
&
pageId
);
pSlot
->
info
.
pageId
=
pageId
;
taosArrayPush
(
pPageIdList
,
&
pageId
);
}
memcpy
(
pSlot
->
info
.
data
->
data
+
pSlot
->
info
.
data
->
num
*
pBucket
->
bytes
,
d
,
pBucket
->
bytes
);
...
...
@@ -476,7 +492,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
resetSlotInfo
(
pMemBucket
);
int32_t
groupId
=
getGroupId
(
pMemBucket
->
numOfSlots
,
i
,
pMemBucket
->
times
-
1
);
SIDList
list
=
getDataBufPagesIdList
(
pMemBucket
->
pBuffer
,
groupId
);
SIDList
list
=
taosHashGet
(
pMemBucket
->
groupPagesMap
,
&
groupId
,
sizeof
(
groupId
)
);
assert
(
list
->
size
>
0
);
for
(
int32_t
f
=
0
;
f
<
list
->
size
;
++
f
)
{
...
...
source/util/src/tpagedbuf.c
浏览文件 @
c5812127
...
...
@@ -33,7 +33,7 @@ struct SDiskbasedBuf {
int32_t
pageSize
;
// current used page size
int32_t
inMemPages
;
// numOfPages that are allocated in memory
SList
*
freePgList
;
// free page list
S
HashObj
*
groupSet
;
// id hash table, todo remove i
t
S
Array
*
pIdList
;
// page id lis
t
SHashObj
*
all
;
SList
*
lruList
;
void
*
emptyDummyIdList
;
// dummy id list
...
...
@@ -241,26 +241,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return
0
;
}
static
SIDList
addNewGroup
(
SDiskbasedBuf
*
pBuf
,
int32_t
groupId
)
{
assert
(
taosHashGet
(
pBuf
->
groupSet
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
))
==
NULL
);
SArray
*
pa
=
taosArrayInit
(
1
,
POINTER_BYTES
);
int32_t
ret
=
taosHashPut
(
pBuf
->
groupSet
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
),
&
pa
,
POINTER_BYTES
);
assert
(
ret
==
0
);
return
pa
;
}
static
SPageInfo
*
registerPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
groupId
,
int32_t
pageId
)
{
SIDList
list
=
NULL
;
char
**
p
=
taosHashGet
(
pBuf
->
groupSet
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
));
if
(
p
==
NULL
)
{
// it is a new group id
list
=
addNewGroup
(
pBuf
,
groupId
);
}
else
{
list
=
(
SIDList
)(
*
p
);
}
static
SPageInfo
*
registerPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
pageId
)
{
pBuf
->
numOfPages
+=
1
;
SPageInfo
*
ppi
=
taosMemoryMalloc
(
sizeof
(
SPageInfo
));
...
...
@@ -273,7 +254,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
ppi
->
pn
=
NULL
;
ppi
->
dirty
=
false
;
return
*
(
SPageInfo
**
)
taosArrayPush
(
l
ist
,
&
ppi
);
return
*
(
SPageInfo
**
)
taosArrayPush
(
pBuf
->
pIdL
ist
,
&
ppi
);
}
static
SListNode
*
getEldestUnrefedPage
(
SDiskbasedBuf
*
pBuf
)
{
...
...
@@ -293,16 +274,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
}
}
// int32_t pos = listNEles(pBuf->lruList);
// SListIter iter1 = {0};
// tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD);
// SListNode* pn1 = NULL;
// while((pn1 = tdListNext(&iter1)) != NULL) {
// SPageInfo* pageInfo = *(SPageInfo**) pn1->data;
// printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1);
// pos -= 1;
// }
return
pn
;
}
...
...
@@ -382,7 +353,8 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
// init id hash table
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
);
pPBuf
->
groupSet
=
taosHashInit
(
10
,
fn
,
true
,
false
);
pPBuf
->
pIdList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pPBuf
->
assistBuf
=
taosMemoryMalloc
(
pPBuf
->
pageSize
+
2
);
// EXTRA BYTES
pPBuf
->
all
=
taosHashInit
(
10
,
fn
,
true
,
false
);
...
...
@@ -425,7 +397,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
*
pageId
=
(
++
pBuf
->
allocateId
);
// register page id info
pi
=
registerPage
(
pBuf
,
groupId
,
*
pageId
);
pi
=
registerPage
(
pBuf
,
*
pageId
);
// add to hash map
taosHashPut
(
pBuf
->
all
,
pageId
,
sizeof
(
int32_t
),
&
pi
,
POINTER_BYTES
);
...
...
@@ -526,19 +498,11 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
pBuf
->
statis
.
releasePages
+=
1
;
}
size_t
getNumOfBufGroupId
(
const
SDiskbasedBuf
*
pBuf
)
{
return
taosHashGetSize
(
pBuf
->
groupSet
);
}
size_t
getTotalBufSize
(
const
SDiskbasedBuf
*
pBuf
)
{
return
(
size_t
)
pBuf
->
totalBufSize
;
}
SIDList
getDataBufPagesIdList
(
SDiskbasedBuf
*
pBuf
,
int32_t
groupId
)
{
assert
(
pBuf
!=
NULL
);
char
**
p
=
taosHashGet
(
pBuf
->
groupSet
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
));
if
(
p
==
NULL
)
{
// it is a new group id
return
pBuf
->
emptyDummyIdList
;
}
else
{
return
(
SArray
*
)(
*
p
);
}
SIDList
getDataBufPagesIdList
(
SDiskbasedBuf
*
pBuf
)
{
ASSERT
(
pBuf
!=
NULL
);
return
pBuf
->
pIdList
;
}
void
destroyDiskbasedBuf
(
SDiskbasedBuf
*
pBuf
)
{
...
...
@@ -578,26 +542,21 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
taosRemoveFile
(
pBuf
->
path
);
taosMemoryFreeClear
(
pBuf
->
path
);
SArray
**
p
=
taosHashIterate
(
pBuf
->
groupSet
,
NULL
);
while
(
p
)
{
size_t
n
=
taosArrayGetSize
(
*
p
);
for
(
int32_t
i
=
0
;
i
<
n
;
++
i
)
{
SPageInfo
*
pi
=
taosArrayGetP
(
*
p
,
i
);
taosMemoryFreeClear
(
pi
->
pData
);
taosMemoryFreeClear
(
pi
);
}
taosArrayDestroy
(
*
p
);
p
=
taosHashIterate
(
pBuf
->
groupSet
,
p
);
size_t
n
=
taosArrayGetSize
(
pBuf
->
pIdList
);
for
(
int32_t
i
=
0
;
i
<
n
;
++
i
)
{
SPageInfo
*
pi
=
taosArrayGetP
(
pBuf
->
pIdList
,
i
);
taosMemoryFreeClear
(
pi
->
pData
);
taosMemoryFreeClear
(
pi
);
}
taosArrayDestroy
(
pBuf
->
pIdList
);
tdListFree
(
pBuf
->
lruList
);
tdListFree
(
pBuf
->
freePgList
);
taosArrayDestroy
(
pBuf
->
emptyDummyIdList
);
taosArrayDestroy
(
pBuf
->
pFree
);
taosHashCleanup
(
pBuf
->
groupSet
);
taosHashCleanup
(
pBuf
->
all
);
taosMemoryFreeClear
(
pBuf
->
id
);
...
...
@@ -661,32 +620,32 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
pBuf
->
totalBufSize
/
1024
.
0
,
pBuf
->
numOfPages
,
listNEles
(
pBuf
->
lruList
)
*
pBuf
->
pageSize
/
1024
.
0
,
listNEles
(
pBuf
->
lruList
),
pBuf
->
fileSize
/
1024
.
0
,
pBuf
->
pageSize
/
1024
.
0
f
,
pBuf
->
id
);
printf
(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb
\n
"
,
ps
->
getPages
,
ps
->
releasePages
,
ps
->
flushBytes
/
1024
.
0
f
,
ps
->
flushPages
,
ps
->
loadBytes
/
1024
.
0
f
,
ps
->
loadPages
,
ps
->
loadBytes
/
(
1024
.
0
*
ps
->
loadPages
));
if
(
ps
->
loadPages
>
0
)
{
printf
(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb
\n
"
,
ps
->
getPages
,
ps
->
releasePages
,
ps
->
flushBytes
/
1024
.
0
f
,
ps
->
flushPages
,
ps
->
loadBytes
/
1024
.
0
f
,
ps
->
loadPages
,
ps
->
loadBytes
/
(
1024
.
0
*
ps
->
loadPages
));
}
else
{
printf
(
"no page loaded
\n
"
);
}
}
void
clearDiskbasedBuf
(
SDiskbasedBuf
*
pBuf
)
{
SArray
**
p
=
taosHashIterate
(
pBuf
->
groupSet
,
NULL
);
while
(
p
)
{
size_t
n
=
taosArrayGetSize
(
*
p
);
for
(
int32_t
i
=
0
;
i
<
n
;
++
i
)
{
SPageInfo
*
pi
=
taosArrayGetP
(
*
p
,
i
);
taosMemoryFreeClear
(
pi
->
pData
);
taosMemoryFreeClear
(
pi
);
}
taosArrayDestroy
(
*
p
);
p
=
taosHashIterate
(
pBuf
->
groupSet
,
p
);
size_t
n
=
taosArrayGetSize
(
pBuf
->
pIdList
);
for
(
int32_t
i
=
0
;
i
<
n
;
++
i
)
{
SPageInfo
*
pi
=
taosArrayGetP
(
pBuf
->
pIdList
,
i
);
taosMemoryFreeClear
(
pi
->
pData
);
taosMemoryFreeClear
(
pi
);
}
taosArrayClear
(
pBuf
->
pIdList
);
tdListEmpty
(
pBuf
->
lruList
);
tdListEmpty
(
pBuf
->
freePgList
);
taosArrayClear
(
pBuf
->
emptyDummyIdList
);
taosArrayClear
(
pBuf
->
pFree
);
taosHashClear
(
pBuf
->
groupSet
);
taosHashClear
(
pBuf
->
all
);
pBuf
->
numOfPages
=
0
;
// all pages are in buffer in the first place
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录