Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
b04e3da3
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b04e3da3
编写于
7月 27, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-225] fix bugs in res buf
上级
10b9968d
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
390 addition
and
170 deletion
+390
-170
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+5
-0
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+2
-2
src/query/inc/qExtbuffer.h
src/query/inc/qExtbuffer.h
+1
-1
src/query/inc/qResultbuf.h
src/query/inc/qResultbuf.h
+24
-2
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+4
-3
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+34
-30
src/query/src/qResultbuf.c
src/query/src/qResultbuf.c
+164
-128
src/query/src/qUtil.c
src/query/src/qUtil.c
+9
-4
src/query/tests/resultBufferTest.cpp
src/query/tests/resultBufferTest.cpp
+53
-0
src/util/inc/tlist.h
src/util/inc/tlist.h
+2
-0
src/util/src/tlist.c
src/util/src/tlist.c
+16
-0
tests/script/general/parser/testSuite.sim
tests/script/general/parser/testSuite.sim
+2
-0
tests/script/general/parser/topbot.sim
tests/script/general/parser/topbot.sim
+74
-0
未找到文件。
src/client/src/tscFunctionImpl.c
浏览文件 @
b04e3da3
...
...
@@ -2131,6 +2131,11 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
}
bool
topbot_datablock_filter
(
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
,
const
char
*
minval
,
const
char
*
maxval
)
{
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
if
(
pResInfo
==
NULL
)
{
return
true
;
}
STopBotInfo
*
pTopBotInfo
=
getTopBotOutputInfo
(
pCtx
);
// required number of results are not reached, continue load data block
...
...
src/query/inc/qExecutor.h
浏览文件 @
b04e3da3
...
...
@@ -42,8 +42,8 @@ typedef struct SSqlGroupbyExpr {
}
SSqlGroupbyExpr
;
typedef
struct
SPosInfo
{
int
16
_t
pageId
;
int
16
_t
rowId
;
int
32
_t
pageId
;
int
32
_t
rowId
;
}
SPosInfo
;
typedef
struct
SWindowStatus
{
...
...
src/query/inc/qExtbuffer.h
浏览文件 @
b04e3da3
...
...
@@ -28,7 +28,7 @@ extern "C" {
#include "tdataformat.h"
#include "talgo.h"
#define DEFAULT_PAGE_SIZE (1024L*4) // 16k larger than the SHistoInfo
#define DEFAULT_PAGE_SIZE (1024L*
6
4) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
...
...
src/query/inc/qResultbuf.h
浏览文件 @
b04e3da3
...
...
@@ -34,10 +34,11 @@ typedef struct SPageDiskInfo {
}
SPageDiskInfo
;
typedef
struct
SPageInfo
{
SListNode
*
pn
;
// point to list node
int32_t
pageId
;
SPageDiskInfo
info
;
void
*
pData
;
T_REF_DECLARE
();
bool
used
;
// set current page is in used
}
SPageInfo
;
typedef
struct
SFreeListItem
{
...
...
@@ -45,6 +46,15 @@ typedef struct SFreeListItem {
int32_t
len
;
}
SFreeListItem
;
typedef
struct
SResultBufStatis
{
int32_t
flushBytes
;
int32_t
loadBytes
;
int32_t
getPages
;
int32_t
releasePages
;
int32_t
flushPages
;
int32_t
fileSize
;
}
SResultBufStatis
;
typedef
struct
SDiskbasedResultBuf
{
int32_t
numOfRowsPerPage
;
int32_t
numOfPages
;
...
...
@@ -64,6 +74,8 @@ typedef struct SDiskbasedResultBuf {
void
*
assistBuf
;
// assistant buffer for compress data
SArray
*
pFree
;
// free area in file
int32_t
nextPos
;
// next page flush position
SResultBufStatis
statis
;
}
SDiskbasedResultBuf
;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
...
...
@@ -119,6 +131,16 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
*/
void
releaseResBufPage
(
SDiskbasedResultBuf
*
pResultBuf
,
void
*
page
);
void
releaseResBufPageInfo
(
SDiskbasedResultBuf
*
pResultBuf
,
SPageInfo
*
pi
);
/**
*
* @param pResultBuf
* @param id
* @return
*/
//tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
...
...
@@ -144,7 +166,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle);
* @param pList
* @return
*/
int32_t
getLastPageId
(
SIDList
pList
);
SPageInfo
*
getLastPageInfo
(
SIDList
pList
);
#ifdef __cplusplus
}
...
...
src/query/inc/qUtil.h
浏览文件 @
b04e3da3
...
...
@@ -45,13 +45,14 @@ bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
int32_t
createQueryResultInfo
(
SQuery
*
pQuery
,
SWindowResult
*
pResultRow
,
bool
isSTableQuery
,
size_t
interBufSize
);
static
FORCE_INLINE
char
*
getPosInResultPage
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
columnIndex
,
SWindowResult
*
pResult
)
{
static
FORCE_INLINE
char
*
getPosInResultPage
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
columnIndex
,
SWindowResult
*
pResult
,
tFilePage
*
page
)
{
assert
(
pResult
!=
NULL
&&
pRuntimeEnv
!=
NULL
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pos
.
pageId
);
int32_t
realRowId
=
pResult
->
pos
.
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
);
// tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t
realRowId
=
pResult
->
pos
.
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
);
return
((
char
*
)
page
->
data
)
+
pRuntimeEnv
->
offset
[
columnIndex
]
*
pRuntimeEnv
->
numOfRowsPerPage
+
pQuery
->
pSelectExpr
[
columnIndex
].
bytes
*
realRowId
;
}
...
...
src/query/src/qExecutor.c
浏览文件 @
b04e3da3
...
...
@@ -221,7 +221,7 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) {
}
static
int32_t
getGroupResultId
(
int32_t
groupIndex
)
{
int32_t
base
=
200000
;
int32_t
base
=
200000
00
;
return
base
+
(
groupIndex
*
10000
);
}
...
...
@@ -478,10 +478,14 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
if
(
taosArrayGetSize
(
list
)
==
0
)
{
pData
=
getNewDataBuf
(
pResultBuf
,
sid
,
&
pageId
);
}
else
{
pageId
=
getLastPageId
(
list
);
pData
=
getResBufPage
(
pResultBuf
,
pageId
);
SPageInfo
*
pi
=
getLastPageInfo
(
list
);
pData
=
getResBufPage
(
pResultBuf
,
pi
->
pageId
);
pageId
=
pi
->
pageId
;
if
(
pData
->
num
>=
numOfRowsPerPage
)
{
// release current page first, and prepare the next one
releaseResBufPageInfo
(
pResultBuf
,
pi
);
pData
=
getNewDataBuf
(
pResultBuf
,
sid
,
&
pageId
);
if
(
pData
!=
NULL
)
{
assert
(
pData
->
num
==
0
);
// number of elements must be 0 for new allocated buffer
...
...
@@ -497,6 +501,8 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
if
(
pWindowRes
->
pos
.
pageId
==
-
1
)
{
// not allocated yet, allocate new buffer
pWindowRes
->
pos
.
pageId
=
pageId
;
pWindowRes
->
pos
.
rowId
=
pData
->
num
++
;
assert
(
pWindowRes
->
pos
.
pageId
>=
0
);
}
return
0
;
...
...
@@ -2111,9 +2117,6 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
}
if
(
!
needToLoadDataBlock
(
pRuntimeEnv
,
*
pStatis
,
pRuntimeEnv
->
pCtx
,
pBlockInfo
->
rows
))
{
#if defined(_DEBUG_VIEW)
qDebug
(
"QInfo:%p block discarded by per-filter"
,
GET_QINFO_ADDR
(
pRuntimeEnv
));
#endif
// current block has been discard due to filter applied
pRuntimeEnv
->
summary
.
discardBlocks
+=
1
;
qDebug
(
"QInfo:%p data block discard, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
...
...
@@ -2446,6 +2449,8 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pos
.
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
if
(
!
mergeFlag
)
{
...
...
@@ -2458,7 +2463,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
pCtx
[
i
].
hasNull
=
true
;
pCtx
[
i
].
nStartQueryTimestamp
=
timestamp
;
pCtx
[
i
].
aInputElemBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pWindowRes
);
pCtx
[
i
].
aInputElemBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pWindowRes
,
page
);
// in case of tag column, the tag information should be extracted from input buffer
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TAG
)
{
...
...
@@ -2615,14 +2620,16 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
SWindowResInfo
*
pWindowResInfo1
=
&
supporter
->
pTableQueryInfo
[
left
]
->
windowResInfo
;
SWindowResult
*
pWindowRes1
=
getWindowResult
(
pWindowResInfo1
,
leftPos
);
tFilePage
*
page1
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes1
->
pos
.
pageId
);
char
*
b1
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes1
);
char
*
b1
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes1
,
page1
);
TSKEY
leftTimestamp
=
GET_INT64_VAL
(
b1
);
SWindowResInfo
*
pWindowResInfo2
=
&
supporter
->
pTableQueryInfo
[
right
]
->
windowResInfo
;
SWindowResult
*
pWindowRes2
=
getWindowResult
(
pWindowResInfo2
,
rightPos
);
tFilePage
*
page2
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes2
->
pos
.
pageId
);
char
*
b2
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes2
);
char
*
b2
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes2
,
page2
);
TSKEY
rightTimestamp
=
GET_INT64_VAL
(
b2
);
if
(
leftTimestamp
==
rightTimestamp
)
{
...
...
@@ -2685,35 +2692,26 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
int32_t
id
=
getGroupResultId
(
pQInfo
->
groupIndex
-
1
);
SIDList
list
=
getDataBufPagesIdList
(
pResultBuf
,
pQInfo
->
offset
+
id
);
int32_t
total
=
0
;
int32_t
size
=
taosArrayGetSize
(
list
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
int32_t
*
pgId
=
taosArrayGet
(
list
,
i
);
tFilePage
*
pData
=
getResBufPage
(
pResultBuf
,
*
pgId
);
total
+=
pData
->
num
;
}
int32_t
rows
=
total
;
int32_t
offset
=
0
;
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
int32_t
*
pgId
=
taosArrayGet
(
list
,
j
);
tFilePage
*
pData
=
getResBufPage
(
pResultBuf
,
*
pg
Id
);
SPageInfo
*
pi
=
*
(
SPageInfo
**
)
taosArrayGet
(
list
,
j
);
tFilePage
*
pData
=
getResBufPage
(
pResultBuf
,
pi
->
page
Id
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
bytes
=
pRuntimeEnv
->
pCtx
[
i
].
outputBytes
;
char
*
pDest
=
pQuery
->
sdata
[
i
]
->
data
;
memcpy
(
pDest
+
offset
*
bytes
,
pData
->
data
+
pRuntimeEnv
->
offset
[
i
]
*
pData
->
num
,
bytes
*
pData
->
num
);
memcpy
(
pDest
+
offset
*
bytes
,
pData
->
data
+
pRuntimeEnv
->
offset
[
i
]
*
pData
->
num
,
bytes
*
pData
->
num
);
}
// rows += pData->num;
offset
+=
pData
->
num
;
}
assert
(
pQuery
->
rec
.
rows
==
0
);
pQuery
->
rec
.
rows
+=
rows
;
pQuery
->
rec
.
rows
+=
offset
;
pQInfo
->
offset
+=
1
;
}
...
...
@@ -2777,7 +2775,6 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
assert
(
pQInfo
->
numOfGroupResultPages
==
0
);
return
0
;
}
else
if
(
numOfTables
==
1
)
{
// no need to merge results since only one table in each group
}
SCompSupporter
cs
=
{
pTableList
,
posList
,
pQInfo
};
...
...
@@ -2802,8 +2799,9 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SWindowResInfo
*
pWindowResInfo
=
&
pTableList
[
pos
]
->
windowResInfo
;
SWindowResult
*
pWindowRes
=
getWindowResult
(
pWindowResInfo
,
cs
.
position
[
pos
]);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pos
.
pageId
);
char
*
b
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes
);
char
*
b
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes
,
page
);
TSKEY
ts
=
GET_INT64_VAL
(
b
);
assert
(
ts
==
pWindowRes
->
window
.
skey
);
...
...
@@ -3517,9 +3515,11 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pos
.
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
pCtx
->
aOutputBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pResult
);
pCtx
->
aOutputBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pResult
,
page
);
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
...
...
@@ -3542,6 +3542,8 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage
*
bufPage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pos
.
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
...
...
@@ -3550,7 +3552,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
continue
;
}
pCtx
->
aOutputBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pResult
);
pCtx
->
aOutputBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pResult
,
bufPage
);
pCtx
->
currentStage
=
0
;
int32_t
functionId
=
pCtx
->
functionId
;
...
...
@@ -3713,11 +3715,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
pQInfo
->
groupIndex
+=
1
;
}
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
result
[
i
].
pos
.
pageId
);
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
size
=
pRuntimeEnv
->
pCtx
[
j
].
outputBytes
;
char
*
out
=
pQuery
->
sdata
[
j
]
->
data
+
numOfResult
*
size
;
char
*
in
=
getPosInResultPage
(
pRuntimeEnv
,
j
,
&
result
[
i
]);
char
*
in
=
getPosInResultPage
(
pRuntimeEnv
,
j
,
&
result
[
i
]
,
page
);
memcpy
(
out
,
in
+
oldOffset
*
size
,
size
*
numOfRowsToCopy
);
}
...
...
@@ -4240,8 +4244,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
getIntermediateBufInfo
(
pRuntimeEnv
,
&
ps
,
&
rowsize
);
if
(
isSTableQuery
&&
!
onlyQueryTags
(
pRuntimeEnv
->
pQuery
))
{
int32_t
numOfPages
=
getInitialPageNum
(
pQInfo
);
code
=
createDiskbasedResultBuffer
(
&
pRuntimeEnv
->
pResultBuf
,
numOfPages
,
rowsize
,
ps
,
numOfPages
,
pQInfo
);
//
int32_t numOfPages = getInitialPageNum(pQInfo);
code
=
createDiskbasedResultBuffer
(
&
pRuntimeEnv
->
pResultBuf
,
2
,
rowsize
,
ps
,
2
,
pQInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
src/query/src/qResultbuf.c
浏览文件 @
b04e3da3
#include "qResultbuf.h"
#include
<stddef.h>
#include
<tscompression.h>
#include
"stddef.h"
#include
"tscompression.h"
#include "hash.h"
#include "qExtbuffer.h"
#include "queryLog.h"
#include "taoserror.h"
#define GET_DATA_PAYLOAD(_p) ((_p)->pData + POINTER_BYTES)
int32_t
createDiskbasedResultBuffer
(
SDiskbasedResultBuf
**
pResultBuf
,
int32_t
numOfPages
,
int32_t
rowSize
,
int32_t
pagesize
,
int32_t
inMemPages
,
const
void
*
handle
)
{
...
...
@@ -15,23 +17,22 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
return
TSDB_CODE_COM_OUT_OF_MEMORY
;
}
pResBuf
->
pageSize
=
pagesize
;
pResBuf
->
numOfPages
=
0
;
// all pages are in buffer in the first place
pResBuf
->
inMemPages
=
inMemPages
;
pResBuf
->
pageSize
=
pagesize
;
pResBuf
->
numOfPages
=
0
;
// all pages are in buffer in the first place
pResBuf
->
inMemPages
=
inMemPages
;
pResBuf
->
totalBufSize
=
pResBuf
->
numOfPages
*
pagesize
;
pResBuf
->
allocateId
=
-
1
;
pResBuf
->
comp
=
true
;
assert
(
inMemPages
<=
numOfPages
);
pResBuf
->
numOfRowsPerPage
=
(
pagesize
-
sizeof
(
tFilePage
))
/
rowSize
;
pResBuf
->
totalBufSize
=
pResBuf
->
numOfPages
*
pagesize
;
pResBuf
->
allocateId
=
-
1
;
pResBuf
->
lruList
=
tdListNew
(
POINTER_BYTES
);
// init id hash table
pResBuf
->
groupSet
=
taosHashInit
(
10
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
);
pResBuf
->
all
=
taosHashInit
(
10
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
);
pResBuf
->
assistBuf
=
malloc
(
pResBuf
->
pageSize
+
2
);
// EXTRA BYTES
pResBuf
->
comp
=
true
;
char
path
[
PATH_MAX
]
=
{
0
};
getTmpfilePath
(
"qbuf"
,
path
);
...
...
@@ -49,7 +50,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
#define NUM_OF_PAGES_ON_DISK(_r) ((_r)->numOfPages - (_r)->inMemPages)
#define FILE_SIZE_ON_DISK(_r) (NUM_OF_PAGES_ON_DISK(_r) * (_r)->pageSize)
static
int32_t
createDisk
ResidesBuf
(
SDiskbasedResultBuf
*
pResultBuf
)
{
static
int32_t
createDisk
File
(
SDiskbasedResultBuf
*
pResultBuf
)
{
pResultBuf
->
file
=
fopen
(
pResultBuf
->
path
,
"wb+"
);
if
(
pResultBuf
->
file
==
NULL
)
{
qError
(
"failed to create tmp file: %s on disk. %s"
,
pResultBuf
->
path
,
strerror
(
errno
));
...
...
@@ -59,15 +60,27 @@ static int32_t createDiskResidesBuf(SDiskbasedResultBuf* pResultBuf) {
return
TSDB_CODE_SUCCESS
;
}
static
char
*
doCompressData
(
void
*
data
,
int32_t
srcSize
,
int32_t
*
dst
,
bool
comp
,
void
*
assis
tBuf
)
{
// do nothing
if
(
!
comp
)
{
static
char
*
doCompressData
(
void
*
data
,
int32_t
srcSize
,
int32_t
*
dst
,
SDiskbasedResultBuf
*
pResul
tBuf
)
{
// do nothing
if
(
!
pResultBuf
->
comp
)
{
*
dst
=
srcSize
;
return
data
;
}
*
dst
=
tsCompressString
(
data
,
srcSize
,
1
,
assistBuf
,
srcSize
,
ONE_STAGE_COMP
,
NULL
,
0
);
*
dst
=
tsCompressString
(
data
,
srcSize
,
1
,
pResultBuf
->
assistBuf
,
srcSize
,
ONE_STAGE_COMP
,
NULL
,
0
);
memcpy
(
data
,
assistBuf
,
*
dst
);
memcpy
(
data
,
pResultBuf
->
assistBuf
,
*
dst
);
return
data
;
}
static
char
*
doDecompressData
(
void
*
data
,
int32_t
srcSize
,
int32_t
*
dst
,
SDiskbasedResultBuf
*
pResultBuf
)
{
// do nothing
if
(
!
pResultBuf
->
comp
)
{
*
dst
=
srcSize
;
return
data
;
}
*
dst
=
tsDecompressString
(
data
,
srcSize
,
1
,
pResultBuf
->
assistBuf
,
pResultBuf
->
pageSize
,
ONE_STAGE_COMP
,
NULL
,
0
);
memcpy
(
data
,
pResultBuf
->
assistBuf
,
*
dst
);
return
data
;
}
...
...
@@ -96,11 +109,10 @@ static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t si
}
static
char
*
doFlushPageToDisk
(
SDiskbasedResultBuf
*
pResultBuf
,
SPageInfo
*
pg
)
{
assert
(
T_REF_VAL_GET
(
pg
)
==
0
&&
pg
->
pData
!=
NULL
);
assert
(
!
pg
->
used
&&
pg
->
pData
!=
NULL
);
int32_t
size
=
-
1
;
char
*
t
=
doCompressData
(
pg
->
pData
+
POINTER_BYTES
,
pResultBuf
->
pageSize
,
&
size
,
pResultBuf
->
comp
,
pResultBuf
->
assistBuf
);
pg
->
info
.
length
=
size
;
char
*
t
=
doCompressData
(
GET_DATA_PAYLOAD
(
pg
),
pResultBuf
->
pageSize
,
&
size
,
pResultBuf
);
// this page is flushed to disk for the first time
if
(
pg
->
info
.
offset
==
-
1
)
{
...
...
@@ -108,26 +120,30 @@ static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
pResultBuf
->
nextPos
+=
size
;
fseek
(
pResultBuf
->
file
,
pg
->
info
.
offset
,
SEEK_SET
);
int32_t
ret
=
fwrite
(
t
,
1
,
size
,
pResultBuf
->
file
);
UNUSED
(
ret
);
/*int32_t ret =*/
fwrite
(
t
,
1
,
size
,
pResultBuf
->
file
);
}
else
{
if
(
pg
->
info
.
length
<
size
)
{
// length becomes greater, current space is not enough, allocate new place.
//1. add current space to free list
// length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
if
(
pg
->
info
.
length
<
size
)
{
// 1. add current space to free list
taosArrayPush
(
pResultBuf
->
pFree
,
&
pg
->
info
);
//2. allocate new position, and update the info
//
2. allocate new position, and update the info
pg
->
info
.
offset
=
allocatePositionInFile
(
pResultBuf
,
size
);
pResultBuf
->
nextPos
+=
size
;
//3. write to disk.
fseek
(
pResultBuf
->
file
,
pg
->
info
.
offset
,
SEEK_SET
);
fwrite
(
t
,
size
,
1
,
pResultBuf
->
file
);
}
//3. write to disk.
fseek
(
pResultBuf
->
file
,
pg
->
info
.
offset
,
SEEK_SET
);
fwrite
(
t
,
size
,
1
,
pResultBuf
->
file
);
}
char
*
ret
=
pg
->
pData
;
memset
(
ret
,
0
,
pResultBuf
->
pageSize
);
pg
->
pData
=
NULL
;
pg
->
info
.
length
=
size
;
pResultBuf
->
statis
.
flushBytes
+=
pg
->
info
.
length
;
return
ret
;
}
...
...
@@ -137,7 +153,7 @@ static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
assert
(
pResultBuf
->
numOfPages
*
pResultBuf
->
pageSize
==
pResultBuf
->
totalBufSize
&&
pResultBuf
->
numOfPages
>=
pResultBuf
->
inMemPages
);
if
(
pResultBuf
->
file
==
NULL
)
{
if
((
ret
=
createDisk
ResidesBuf
(
pResultBuf
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
ret
=
createDisk
File
(
pResultBuf
))
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
ret
;
return
NULL
;
}
...
...
@@ -146,12 +162,29 @@ static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
return
doFlushPageToDisk
(
pResultBuf
,
pg
);
}
// load file block data in disk
static
char
*
loadPageFromDisk
(
SDiskbasedResultBuf
*
pResultBuf
,
SPageInfo
*
pg
)
{
int32_t
ret
=
fseek
(
pResultBuf
->
file
,
pg
->
info
.
offset
,
SEEK_SET
);
ret
=
fread
(
GET_DATA_PAYLOAD
(
pg
),
1
,
pg
->
info
.
length
,
pResultBuf
->
file
);
if
(
ret
!=
pg
->
info
.
length
)
{
terrno
=
errno
;
return
NULL
;
}
pResultBuf
->
statis
.
loadBytes
+=
pg
->
info
.
length
;
int32_t
fullSize
=
0
;
doDecompressData
(
GET_DATA_PAYLOAD
(
pg
),
pg
->
info
.
length
,
&
fullSize
,
pResultBuf
);
return
GET_DATA_PAYLOAD
(
pg
);
}
#define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages)
static
SIDList
addNewGroup
(
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
groupId
)
{
assert
(
taosHashGet
(
pResultBuf
->
groupSet
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
))
==
NULL
);
SArray
*
pa
=
taosArrayInit
(
1
,
sizeof
(
SPageInfo
)
);
SArray
*
pa
=
taosArrayInit
(
1
,
POINTER_BYTES
);
int32_t
ret
=
taosHashPut
(
pResultBuf
->
groupSet
,
(
const
char
*
)
&
groupId
,
sizeof
(
int32_t
),
&
pa
,
POINTER_BYTES
);
assert
(
ret
==
0
);
...
...
@@ -170,44 +203,79 @@ static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId,
pResultBuf
->
numOfPages
+=
1
;
SPageInfo
ppi
=
{
.
info
=
PAGE_INFO_INITIALIZER
,
.
pageId
=
pageId
,
};
return
taosArrayPush
(
list
,
&
ppi
);
SPageInfo
*
ppi
=
malloc
(
sizeof
(
SPageInfo
));
//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL};
ppi
->
info
=
PAGE_INFO_INITIALIZER
;
ppi
->
pageId
=
pageId
;
ppi
->
pData
=
NULL
;
ppi
->
pn
=
NULL
;
ppi
->
used
=
true
;
return
*
(
SPageInfo
**
)
taosArrayPush
(
list
,
&
ppi
);
}
tFilePage
*
getNewDataBuf
(
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
groupId
,
int32_t
*
pageId
)
{
char
*
allocPg
=
NULL
;
static
SListNode
*
getEldestUnrefedPage
(
SDiskbasedResultBuf
*
pResultBuf
)
{
SListIter
iter
=
{
0
};
tdListInitIter
(
pResultBuf
->
lruList
,
&
iter
,
TD_LIST_BACKWARD
);
if
(
NO_AVAILABLE_PAGES
(
pResultBuf
))
{
SListNode
*
pn
=
NULL
;
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
assert
(
pn
!=
NULL
);
// get the last page in linked list
SListIter
iter
=
{
0
};
tdListInitIter
(
pResultBuf
->
lruList
,
&
iter
,
TD_LIST_BACKWARD
);
SPageInfo
*
pageInfo
=
*
(
SPageInfo
**
)
pn
->
data
;
assert
(
pageInfo
->
pageId
>=
0
&&
pageInfo
->
pn
==
pn
);
SListNode
*
pn
=
NULL
;
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
assert
(
pn
!=
NULL
);
if
(
T_REF_VAL_GET
(
*
(
SPageInfo
**
)
pn
->
data
)
==
0
)
{
break
;
}
if
(
!
pageInfo
->
used
)
{
break
;
}
}
// all pages are referenced by user, try to allocate new space
if
(
pn
==
NULL
)
{
int32_t
prev
=
pResultBuf
->
inMemPages
;
pResultBuf
->
inMemPages
=
pResultBuf
->
inMemPages
*
1
.
5
;
return
pn
;
}
static
char
*
evicOneDataPage
(
SDiskbasedResultBuf
*
pResultBuf
)
{
char
*
bufPage
=
NULL
;
SListNode
*
pn
=
getEldestUnrefedPage
(
pResultBuf
);
qWarn
(
"%p in memory buf page not sufficient, expand from %d to %d, page size:%d"
,
pResultBuf
,
prev
,
// all pages are referenced by user, try to allocate new space
if
(
pn
==
NULL
)
{
int32_t
prev
=
pResultBuf
->
inMemPages
;
pResultBuf
->
inMemPages
=
pResultBuf
->
inMemPages
*
1
.
5
;
qWarn
(
"%p in memory buf page not sufficient, expand from %d to %d, page size:%d"
,
pResultBuf
,
prev
,
pResultBuf
->
inMemPages
,
pResultBuf
->
pageSize
);
}
else
{
tdListPopNode
(
pResultBuf
->
lruList
,
pn
);
SPageInfo
*
d
=
*
(
SPageInfo
**
)
pn
->
data
;
tfree
(
pn
);
}
else
{
pResultBuf
->
statis
.
flushPages
+=
1
;
tdListPopNode
(
pResultBuf
->
lruList
,
pn
);
allocPg
=
flushPageToDisk
(
pResultBuf
,
d
);
if
(
allocPg
==
NULL
)
{
return
NULL
;
}
}
SPageInfo
*
d
=
*
(
SPageInfo
**
)
pn
->
data
;
assert
(
d
->
pn
==
pn
);
d
->
pn
=
NULL
;
tfree
(
pn
);
bufPage
=
flushPageToDisk
(
pResultBuf
,
d
);
}
return
bufPage
;
}
static
void
lruListPushFront
(
SList
*
pList
,
SPageInfo
*
pi
)
{
tdListPrepend
(
pList
,
&
pi
);
SListNode
*
front
=
tdListGetHead
(
pList
);
pi
->
pn
=
front
;
}
static
void
lruListMoveToFront
(
SList
*
pList
,
SPageInfo
*
pi
)
{
tdListPopNode
(
pList
,
pi
->
pn
);
tdListPrependNode
(
pList
,
pi
->
pn
);
}
tFilePage
*
getNewDataBuf
(
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
groupId
,
int32_t
*
pageId
)
{
pResultBuf
->
statis
.
getPages
+=
1
;
char
*
availablePage
=
NULL
;
if
(
NO_AVAILABLE_PAGES
(
pResultBuf
))
{
availablePage
=
evicOneDataPage
(
pResultBuf
);
}
// register new id in this group
...
...
@@ -216,111 +284,72 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
// register page id info
SPageInfo
*
pi
=
registerPage
(
pResultBuf
,
groupId
,
*
pageId
);
assert
(
pResultBuf
->
inMemPages
>
0
);
// add to LRU list
assert
(
listNEles
(
pResultBuf
->
lruList
)
<
pResultBuf
->
inMemPages
);
tdListPrepend
(
pResultBuf
->
lruList
,
&
pi
);
lruListPushFront
(
pResultBuf
->
lruList
,
pi
);
// add to hash map
taosHashPut
(
pResultBuf
->
all
,
pageId
,
sizeof
(
int32_t
),
&
pi
,
POINTER_BYTES
);
// allocate buf
if
(
a
llocPg
==
NULL
)
{
if
(
a
vailablePage
==
NULL
)
{
pi
->
pData
=
calloc
(
1
,
pResultBuf
->
pageSize
+
POINTER_BYTES
);
}
else
{
pi
->
pData
=
a
llocPg
;
pi
->
pData
=
a
vailablePage
;
}
pResultBuf
->
totalBufSize
+=
pResultBuf
->
pageSize
;
T_REF_INC
(
pi
);
// add ref count
((
void
**
)
pi
->
pData
)[
0
]
=
pi
;
pi
->
used
=
true
;
return
pi
->
pData
+
POINTER_BYTES
;
return
GET_DATA_PAYLOAD
(
pi
)
;
}
tFilePage
*
getResBufPage
(
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
id
)
{
assert
(
pResultBuf
!=
NULL
&&
id
>=
0
);
pResultBuf
->
statis
.
getPages
+=
1
;
SPageInfo
**
pi
=
taosHashGet
(
pResultBuf
->
all
,
&
id
,
sizeof
(
int32_t
));
assert
(
pi
!=
NULL
&&
*
pi
!=
NULL
);
if
((
*
pi
)
->
pData
!=
NULL
)
{
// it is in memory
// no need to update the LRU list
// no need to update the LRU list
if only one page exists
if
(
pResultBuf
->
numOfPages
==
1
)
{
return
(
*
pi
)
->
pData
+
POINTER_BYTES
;
(
*
pi
)
->
used
=
true
;
return
GET_DATA_PAYLOAD
(
*
pi
);
}
SListNode
*
pnode
=
NULL
;
// todo speed up
SListIter
iter
=
{
0
};
tdListInitIter
(
pResultBuf
->
lruList
,
&
iter
,
TD_LIST_FORWARD
);
SPageInfo
**
pInfo
=
(
SPageInfo
**
)
((
*
pi
)
->
pn
->
data
);
assert
(
*
pInfo
==
*
pi
);
while
((
pnode
=
tdListNext
(
&
iter
))
!=
NULL
)
{
SPageInfo
**
pInfo
=
(
SPageInfo
**
)
pnode
->
data
;
lruListMoveToFront
(
pResultBuf
->
lruList
,
(
*
pi
));
(
*
pi
)
->
used
=
true
;
// remove it and add it into the front of linked-list
if
((
*
pInfo
)
->
pageId
==
id
)
{
tdListPopNode
(
pResultBuf
->
lruList
,
pnode
);
tdListPrependNode
(
pResultBuf
->
lruList
,
pnode
);
T_REF_INC
(
*
(
SPageInfo
**
)
pnode
->
data
);
return
GET_DATA_PAYLOAD
(
*
pi
);
return
((
*
(
SPageInfo
**
)
pnode
->
data
)
->
pData
+
POINTER_BYTES
);
}
}
}
else
{
// not in memory
assert
((
*
pi
)
->
pData
==
NULL
&&
(
*
pi
)
->
info
.
length
>=
0
&&
(
*
pi
)
->
info
.
offset
>=
0
);
// choose the be flushed page: get the last page in linked list
SListIter
iter1
=
{
0
};
tdListInitIter
(
pResultBuf
->
lruList
,
&
iter1
,
TD_LIST_BACKWARD
);
assert
((
*
pi
)
->
pData
==
NULL
&&
(
*
pi
)
->
pn
==
NULL
&&
(
*
pi
)
->
info
.
length
>=
0
&&
(
*
pi
)
->
info
.
offset
>=
0
);
SListNode
*
pn
=
NULL
;
while
((
pn
=
tdListNext
(
&
iter1
))
!=
NULL
)
{
assert
(
pn
!=
NULL
);
if
(
T_REF_VAL_GET
(
*
(
SPageInfo
**
)(
pn
->
data
))
==
0
)
{
break
;
}
char
*
availablePage
=
NULL
;
if
(
NO_AVAILABLE_PAGES
(
pResultBuf
))
{
availablePage
=
evicOneDataPage
(
pResultBuf
);
}
// all pages are referenced by user, try to allocate new space
if
(
pn
==
NULL
)
{
int32_t
prev
=
pResultBuf
->
inMemPages
;
pResultBuf
->
inMemPages
=
pResultBuf
->
inMemPages
*
1
.
5
;
qWarn
(
"%p in memory buf page not sufficient, expand from %d to %d, page size:%d"
,
pResultBuf
,
prev
,
pResultBuf
->
inMemPages
,
pResultBuf
->
pageSize
);
if
(
availablePage
==
NULL
)
{
(
*
pi
)
->
pData
=
calloc
(
1
,
pResultBuf
->
pageSize
+
POINTER_BYTES
);
}
else
{
tdListPopNode
(
pResultBuf
->
lruList
,
pn
);
if
(
flushPageToDisk
(
pResultBuf
,
*
(
SPageInfo
**
)
pn
->
data
)
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
char
*
buf
=
(
*
(
SPageInfo
**
)
pn
->
data
)
->
pData
;
(
*
(
SPageInfo
**
)
pn
->
data
)
->
pData
=
NULL
;
(
*
pi
)
->
pData
=
buf
;
((
void
**
)((
*
pi
)
->
pData
))[
0
]
=
(
*
pi
);
tfree
(
pn
);
}
// load file in disk
int32_t
ret
=
fseek
(
pResultBuf
->
file
,
(
*
pi
)
->
info
.
offset
,
SEEK_SET
);
ret
=
fread
((
*
pi
)
->
pData
+
POINTER_BYTES
,
1
,
(
*
pi
)
->
info
.
length
,
pResultBuf
->
file
);
if
(
ret
!=
(
*
pi
)
->
info
.
length
)
{
terrno
=
errno
;
return
NULL
;
(
*
pi
)
->
pData
=
availablePage
;
}
// todo do decomp
((
void
**
)((
*
pi
)
->
pData
))[
0
]
=
(
*
pi
);
return
(
*
pi
)
->
pData
+
POINTER_BYTES
;
lruListPushFront
(
pResultBuf
->
lruList
,
*
pi
);
loadPageFromDisk
(
pResultBuf
,
*
pi
);
return
GET_DATA_PAYLOAD
(
*
pi
);
}
return
NULL
;
}
void
releaseResBufPage
(
SDiskbasedResultBuf
*
pResultBuf
,
void
*
page
)
{
...
...
@@ -328,9 +357,14 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) {
char
*
p
=
(
char
*
)
page
-
POINTER_BYTES
;
SPageInfo
*
ppi
=
((
SPageInfo
**
)
p
)[
0
];
releaseResBufPageInfo
(
pResultBuf
,
ppi
);
}
assert
(
T_REF_VAL_GET
(
ppi
)
>
0
);
T_REF_DEC
(
ppi
);
void
releaseResBufPageInfo
(
SDiskbasedResultBuf
*
pResultBuf
,
SPageInfo
*
pi
)
{
assert
(
pi
->
pData
!=
NULL
&&
pi
->
used
);
pi
->
used
=
false
;
pResultBuf
->
statis
.
releasePages
+=
1
;
}
size_t
getNumOfRowsPerPage
(
const
SDiskbasedResultBuf
*
pResultBuf
)
{
return
pResultBuf
->
numOfRowsPerPage
;
}
...
...
@@ -373,9 +407,11 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
SArray
**
p
=
(
SArray
**
)
taosHashIterGet
(
iter
);
size_t
n
=
taosArrayGetSize
(
*
p
);
for
(
int32_t
i
=
0
;
i
<
n
;
++
i
)
{
SPageInfo
*
pi
=
taosArrayGet
(
*
p
,
i
);
SPageInfo
*
pi
=
taosArrayGet
P
(
*
p
,
i
);
tfree
(
pi
->
pData
);
tfree
(
pi
);
}
taosArrayDestroy
(
*
p
);
}
...
...
@@ -390,8 +426,8 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
tfree
(
pResultBuf
);
}
int32_t
getLastPageId
(
SIDList
pList
)
{
SPageInfo
*
getLastPageInfo
(
SIDList
pList
)
{
size_t
size
=
taosArrayGetSize
(
pList
);
return
*
(
int32_t
*
)
taosArrayGet
(
pList
,
size
-
1
);
return
(
SPageInfo
*
)
taosArrayGetP
(
pList
,
size
-
1
);
}
src/query/src/qUtil.c
浏览文件 @
b04e3da3
...
...
@@ -233,11 +233,13 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
if
(
pWindowRes
==
NULL
)
{
return
;
}
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pos
.
pageId
);
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
pQuery
->
numOfOutput
;
++
i
)
{
SResultInfo
*
pResultInfo
=
&
pWindowRes
->
resultInfo
[
i
];
char
*
s
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pWindowRes
);
char
*
s
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pWindowRes
,
page
);
size_t
size
=
pRuntimeEnv
->
pQuery
->
pSelectExpr
[
i
].
bytes
;
memset
(
s
,
0
,
size
);
...
...
@@ -274,8 +276,11 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
memcpy
(
pDst
->
interResultBuf
,
pSrc
->
interResultBuf
,
pDst
->
bufLen
);
// copy the output buffer data from src to dst, the position info keep unchanged
char
*
dstBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
dst
);
char
*
srcBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
(
SWindowResult
*
)
src
);
tFilePage
*
dstpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
dst
->
pos
.
pageId
);
char
*
dstBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
dst
,
dstpage
);
tFilePage
*
srcpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
src
->
pos
.
pageId
);
char
*
srcBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
(
SWindowResult
*
)
src
,
srcpage
);
size_t
s
=
pRuntimeEnv
->
pQuery
->
pSelectExpr
[
i
].
bytes
;
memcpy
(
dstBuf
,
srcBuf
,
s
);
...
...
src/query/tests/resultBufferTest.cpp
浏览文件 @
b04e3da3
...
...
@@ -94,7 +94,59 @@ void writeDownTest() {
SArray
*
pa
=
getDataBufPagesIdList
(
pResultBuf
,
groupId
);
ASSERT_EQ
(
taosArrayGetSize
(
pa
),
5
);
destroyResultBuf
(
pResultBuf
,
NULL
);
}
void
recyclePageTest
()
{
SDiskbasedResultBuf
*
pResultBuf
=
NULL
;
int32_t
ret
=
createDiskbasedResultBuffer
(
&
pResultBuf
,
1000
,
64
,
1024
,
4
,
NULL
);
int32_t
pageId
=
0
;
int32_t
writePageId
=
0
;
int32_t
groupId
=
0
;
int32_t
nx
=
12345
;
tFilePage
*
pBufPage
=
getNewDataBuf
(
pResultBuf
,
groupId
,
&
pageId
);
ASSERT_TRUE
(
pBufPage
!=
NULL
);
releaseResBufPage
(
pResultBuf
,
pBufPage
);
tFilePage
*
pBufPage1
=
getNewDataBuf
(
pResultBuf
,
groupId
,
&
pageId
);
tFilePage
*
t1
=
getResBufPage
(
pResultBuf
,
pageId
);
ASSERT_TRUE
(
t1
==
pBufPage1
);
ASSERT_TRUE
(
pageId
==
1
);
tFilePage
*
pBufPage2
=
getNewDataBuf
(
pResultBuf
,
groupId
,
&
pageId
);
tFilePage
*
t2
=
getResBufPage
(
pResultBuf
,
pageId
);
ASSERT_TRUE
(
t2
==
pBufPage2
);
ASSERT_TRUE
(
pageId
==
2
);
tFilePage
*
pBufPage3
=
getNewDataBuf
(
pResultBuf
,
groupId
,
&
pageId
);
tFilePage
*
t3
=
getResBufPage
(
pResultBuf
,
pageId
);
ASSERT_TRUE
(
t3
==
pBufPage3
);
ASSERT_TRUE
(
pageId
==
3
);
tFilePage
*
pBufPage4
=
getNewDataBuf
(
pResultBuf
,
groupId
,
&
pageId
);
tFilePage
*
t4
=
getResBufPage
(
pResultBuf
,
pageId
);
ASSERT_TRUE
(
t4
==
pBufPage4
);
ASSERT_TRUE
(
pageId
==
4
);
releaseResBufPage
(
pResultBuf
,
t4
);
releaseResBufPage
(
pResultBuf
,
t4
);
tFilePage
*
pBufPage5
=
getNewDataBuf
(
pResultBuf
,
groupId
,
&
pageId
);
tFilePage
*
t5
=
getResBufPage
(
pResultBuf
,
pageId
);
ASSERT_TRUE
(
t5
==
pBufPage5
);
ASSERT_TRUE
(
pageId
==
5
);
// flush the written page to disk, and read it out again
tFilePage
*
pBufPagex
=
getResBufPage
(
pResultBuf
,
writePageId
);
*
(
int32_t
*
)(
pBufPagex
->
data
)
=
nx
;
writePageId
=
pageId
;
// update the data
releaseResBufPage
(
pResultBuf
,
pBufPagex
);
tFilePage
*
pBufPagex1
=
getResBufPage
(
pResultBuf
,
1
);
SArray
*
pa
=
getDataBufPagesIdList
(
pResultBuf
,
groupId
);
ASSERT_EQ
(
taosArrayGetSize
(
pa
),
6
);
destroyResultBuf
(
pResultBuf
,
NULL
);
}
...
...
@@ -105,4 +157,5 @@ TEST(testCase, resultBufferTest) {
srand
(
time
(
NULL
));
simpleTest
();
writeDownTest
();
recyclePageTest
();
}
src/util/inc/tlist.h
浏览文件 @
b04e3da3
...
...
@@ -55,6 +55,8 @@ int tdListPrepend(SList *list, void *data);
int
tdListAppend
(
SList
*
list
,
void
*
data
);
SListNode
*
tdListPopHead
(
SList
*
list
);
SListNode
*
tdListPopTail
(
SList
*
list
);
SListNode
*
tdListGetHead
(
SList
*
list
);
SListNode
*
tsListGetTail
(
SList
*
list
);
SListNode
*
tdListPopNode
(
SList
*
list
,
SListNode
*
node
);
void
tdListMove
(
SList
*
src
,
SList
*
dst
);
void
tdListDiscard
(
SList
*
list
);
...
...
src/util/src/tlist.c
浏览文件 @
b04e3da3
...
...
@@ -122,6 +122,22 @@ SListNode *tdListPopTail(SList *list) {
return
node
;
}
SListNode
*
tdListGetHead
(
SList
*
list
)
{
if
(
list
==
NULL
||
list
->
numOfEles
==
0
)
{
return
NULL
;
}
return
list
->
head
;
}
SListNode
*
tsListGetTail
(
SList
*
list
)
{
if
(
list
==
NULL
||
list
->
numOfEles
==
0
)
{
return
NULL
;
}
return
list
->
tail
;
}
SListNode
*
tdListPopNode
(
SList
*
list
,
SListNode
*
node
)
{
if
(
list
->
head
==
node
)
{
list
->
head
=
node
->
next
;
...
...
tests/script/general/parser/testSuite.sim
浏览文件 @
b04e3da3
...
...
@@ -93,6 +93,8 @@ run general/parser/groupby.sim
sleep 2000
run general/parser/tags_filter.sim
sleep 2000
run general/parser/topbot.sim
sleep 2000
run general/parser/union.sim
sleep 2000
run general/parser/sliding.sim
...
...
tests/script/general/parser/topbot.sim
0 → 100644
浏览文件 @
b04e3da3
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
$dbPrefix = tb_db
$tbPrefix = tb_tb
$stbPrefix = tb_stb
$tbNum = 10
$rowNum = 1000
$totalNum = $tbNum * $rowNum
$loops = 200000
$log = 10000
$ts0 = 1537146000000
$delta = 600000
print ========== topbot.sim
$i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db cache 16 maxtables 200
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
$i = 0
$ts = $ts0
$halfNum = $tbNum / 2
while $i < $halfNum
$tbId = $i + $halfNum
$tb = $tbPrefix . $i
$tb1 = $tbPrefix . $tbId
sql create table $tb using $stb tags( $i )
sql create table $tb1 using $stb tags( $tbId )
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar )
sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
endw
print ====== tables created
sql use $db
##### select from table
print ====== select top/bot from table and check num of rows returned
sql select top(c1, 100) from tb_stb0
if $row != 100 then
return -1
endi
sql select last(c2) from tb_tb9
if $row != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录