Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
30975a74
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
30975a74
编写于
12月 04, 2019
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[tbase-1189]
上级
ae9a948b
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
162 addition
and
68 deletion
+162
-68
src/system/detail/inc/vnodeQueryImpl.h
src/system/detail/inc/vnodeQueryImpl.h
+10
-4
src/system/detail/src/vnodeQueryImpl.c
src/system/detail/src/vnodeQueryImpl.c
+118
-50
src/system/detail/src/vnodeQueryProcess.c
src/system/detail/src/vnodeQueryProcess.c
+24
-9
src/system/detail/src/vnodeRead.c
src/system/detail/src/vnodeRead.c
+9
-4
src/system/detail/src/vnodeShell.c
src/system/detail/src/vnodeShell.c
+1
-1
未找到文件。
src/system/detail/inc/vnodeQueryImpl.h
浏览文件 @
30975a74
...
...
@@ -27,7 +27,13 @@ extern "C" {
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query))
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
/*
* set the output buffer page size is 16k
* The page size should be sufficient for at least one output result or intermediate result.
* Some intermediate results may be extremely large, such as top/bottom(100) query.
*/
#define DEFAULT_INTERN_BUF_SIZE 16384L
#define INIT_ALLOCATE_DISK_PAGES 60L
#define DEFAULT_DATA_FILE_MAPPING_PAGES 2L
#define DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE (DEFAULT_DATA_FILE_MAPPING_PAGES * DEFAULT_INTERN_BUF_SIZE)
...
...
@@ -160,7 +166,7 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport);
void
pointInterpSupporterSetData
(
SQInfo
*
pQInfo
,
SPointInterpoSupporter
*
pPointInterpSupport
);
int64_t
loadRequiredBlockIntoMem
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SPositionInfo
*
position
);
void
doCloseAllOpenedResults
(
SMeterQuerySupportObj
*
pSupporter
);
int32_t
doCloseAllOpenedResults
(
SMeterQuerySupportObj
*
pSupporter
);
void
disableFunctForSuppleScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
order
);
void
enableFunctForMasterScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
order
);
...
...
@@ -185,7 +191,7 @@ void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len);
void
setExecutionContext
(
SMeterQuerySupportObj
*
pSupporter
,
SOutputRes
*
outputRes
,
int32_t
meterIdx
,
int32_t
groupIdx
,
SMeterQueryInfo
*
sqinfo
);
void
setIntervalQueryExecutionContext
(
SMeterQuerySupportObj
*
pSupporter
,
int32_t
meterIdx
,
SMeterQueryInfo
*
sqinfo
);
int32_t
setIntervalQueryExecutionContext
(
SMeterQuerySupportObj
*
pSupporter
,
int32_t
meterIdx
,
SMeterQueryInfo
*
sqinfo
);
int64_t
getQueryStartPositionInCache
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
*
slot
,
int32_t
*
pos
,
bool
ignoreQueryRange
);
int64_t
getNextAccessedKeyInData
(
SQuery
*
pQuery
,
int64_t
*
pPrimaryCol
,
SBlockInfo
*
pBlockInfo
,
int32_t
blockStatus
);
...
...
@@ -224,11 +230,11 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY
/**
* add the new allocated disk page to meter query info
* the new allocated disk page is used to keep the intermediate (interval) results
*
*
@param pQuery
* @param pMeterQueryInfo
* @param pSupporter
*/
tFilePage
*
addDataPageForMeterQueryInfo
(
SMeterQueryInfo
*
pMeterQueryInfo
,
SMeterQuerySupportObj
*
pSupporter
);
tFilePage
*
addDataPageForMeterQueryInfo
(
S
Query
*
pQuery
,
S
MeterQueryInfo
*
pMeterQueryInfo
,
SMeterQuerySupportObj
*
pSupporter
);
/**
* save the query range data into SMeterQueryInfo
...
...
src/system/detail/src/vnodeQueryImpl.c
浏览文件 @
30975a74
...
...
@@ -67,13 +67,13 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete
SBlockInfo
*
pBlockInfo
,
int64_t
*
pPrimaryCol
,
char
*
sdata
,
SField
*
pFields
,
__block_search_fn_t
searchFn
);
static
void
saveResult
(
SMeterQuerySupportObj
*
pSupporter
,
SMeterQueryInfo
*
pMeterQueryInfo
,
int32_t
numOfResult
);
static
int32_t
saveResult
(
SMeterQuerySupportObj
*
pSupporter
,
SMeterQueryInfo
*
pMeterQueryInfo
,
int32_t
numOfResult
);
static
void
applyIntervalQueryOnBlock
(
SMeterQuerySupportObj
*
pSupporter
,
SMeterDataInfo
*
pInfoEx
,
char
*
data
,
int64_t
*
pPrimaryData
,
SBlockInfo
*
pBlockInfo
,
int32_t
blockStatus
,
SField
*
pFields
,
__block_search_fn_t
searchFn
);
static
void
resetMergeResultBuf
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
);
static
void
flushFromResultBuf
(
SMeterQuerySupportObj
*
pSupporter
,
const
SQuery
*
pQuery
,
static
int32_t
flushFromResultBuf
(
SMeterQuerySupportObj
*
pSupporter
,
const
SQuery
*
pQuery
,
const
SQueryRuntimeEnv
*
pRuntimeEnv
);
static
void
validateTimestampForSupplementResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int64_t
numOfIncrementRes
);
static
void
getBasicCacheInfoSnapshot
(
SQuery
*
pQuery
,
SCacheInfo
*
pCacheInfo
,
int32_t
vid
);
...
...
@@ -413,7 +413,7 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int
vnodeSetOpenedFileNames
(
pVnodeFileInfo
);
if
(
doOpenQueryFileData
(
pQInfo
,
pVnodeFileInfo
,
vnodeId
)
!=
TSDB_CODE_SUCCESS
)
{
doCloseOpenedFileData
(
pVnodeFileInfo
);
//
there may be partially open fd, close it
anyway.
doCloseOpenedFileData
(
pVnodeFileInfo
);
//
all the fds may be partially opened, close them
anyway.
return
pVnodeFileInfo
->
pHeaderFileData
;
}
}
...
...
@@ -1291,9 +1291,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutputCols
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pBase
.
functionId
;
// if (!functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
// continue;
// }
SField
dummyField
=
{
0
};
...
...
@@ -3052,7 +3049,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
sprintf
(
pVnodeFilesInfo
->
dbFilePathPrefix
,
"%s/vnode%d/db/"
,
tsDirectory
,
vnodeId
);
DIR
*
pDir
=
opendir
(
pVnodeFilesInfo
->
dbFilePathPrefix
);
if
(
pDir
==
NULL
)
{
dError
(
"QInfo:%p failed to open directory:%s
"
,
pQInfo
,
pVnodeFilesInfo
->
dbFilePathPrefix
);
dError
(
"QInfo:%p failed to open directory:%s
, %s"
,
pQInfo
,
pVnodeFilesInfo
->
dbFilePathPrefix
,
strerror
(
errno
)
);
return
;
}
...
...
@@ -3921,10 +3918,15 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
// set 4k page for each meter
pSupporter
->
numOfPages
=
pSupporter
->
numOfMeters
;
ftruncate
(
pSupporter
->
meterOutputFd
,
pSupporter
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
);
ret
=
ftruncate
(
pSupporter
->
meterOutputFd
,
pSupporter
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
dError
(
"QInfo:%p failed to create intermediate result output file:%s. %s"
,
pQInfo
,
pSupporter
->
extBufFile
,
strerror
(
errno
));
return
TSDB_CODE_SERV_NO_DISKSPACE
;
}
pSupporter
->
runtimeEnv
.
numOfRowsPerPage
=
(
DEFAULT_INTERN_BUF_SIZE
-
sizeof
(
tFilePage
))
/
pQuery
->
rowSize
;
pSupporter
->
lastPageId
=
-
1
;
pSupporter
->
bufSize
=
pSupporter
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
;
...
...
@@ -3932,7 +3934,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
pSupporter
->
meterOutputMMapBuf
=
mmap
(
NULL
,
pSupporter
->
bufSize
,
PROT_READ
|
PROT_WRITE
,
MAP_SHARED
,
pSupporter
->
meterOutputFd
,
0
);
if
(
pSupporter
->
meterOutputMMapBuf
==
MAP_FAILED
)
{
dError
(
"QInfo:%p failed to map
data file: %s to disk
. %s"
,
pQInfo
,
pSupporter
->
extBufFile
,
strerror
(
errno
));
dError
(
"QInfo:%p failed to map
temp file: %s
. %s"
,
pQInfo
,
pSupporter
->
extBufFile
,
strerror
(
errno
));
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
}
...
...
@@ -4733,16 +4735,20 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int64_t
st
=
taosGetTimestampMs
();
int32_t
ret
=
TSDB_CODE_SUCCESS
;
while
(
pSupporter
->
subgroupIdx
<
pSupporter
->
pSidSet
->
numOfSubSet
)
{
int32_t
start
=
pSupporter
->
pSidSet
->
starterPos
[
pSupporter
->
subgroupIdx
];
int32_t
end
=
pSupporter
->
pSidSet
->
starterPos
[
pSupporter
->
subgroupIdx
+
1
];
int32_t
ret
=
doMergeMetersResultsToGroupRes
(
pSupporter
,
pQuery
,
pRuntimeEnv
,
pSupporter
->
pMeterDataInfo
,
start
,
end
);
ret
=
doMergeMetersResultsToGroupRes
(
pSupporter
,
pQuery
,
pRuntimeEnv
,
pSupporter
->
pMeterDataInfo
,
start
,
end
);
if
(
ret
<
0
)
{
// not enough disk space to save the data into disk
return
-
1
;
}
pSupporter
->
subgroupIdx
+=
1
;
/
* this group generates at least one result, return results */
/
/ this group generates at least one result, return results
if
(
ret
>
0
)
{
break
;
}
...
...
@@ -4754,7 +4760,7 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) {
dTrace
(
"QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms"
,
GET_QINFO_ADDR
(
pQuery
),
pSupporter
->
subgroupIdx
-
1
,
pSupporter
->
pSidSet
->
numOfSubSet
,
taosGetTimestampMs
()
-
st
);
return
pSupporter
->
numOfGroupResultPages
;
return
TSDB_CODE_SUCCESS
;
}
void
copyResToQueryResultBuf
(
SMeterQuerySupportObj
*
pSupporter
,
SQuery
*
pQuery
)
{
...
...
@@ -4762,7 +4768,9 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery)
pSupporter
->
numOfGroupResultPages
=
0
;
// current results of group has been sent to client, try next group
mergeMetersResultToOneGroups
(
pSupporter
);
if
(
mergeMetersResultToOneGroups
(
pSupporter
)
!=
TSDB_CODE_SUCCESS
)
{
return
;
// failed to save data in the disk
}
// set current query completed
if
(
pSupporter
->
numOfGroupResultPages
==
0
&&
pSupporter
->
subgroupIdx
==
pSupporter
->
pSidSet
->
numOfSubSet
)
{
...
...
@@ -4840,7 +4848,10 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
}
else
{
// copy data to disk buffer
if
(
buffer
[
0
]
->
numOfElems
==
pQuery
->
pointsToRead
)
{
flushFromResultBuf
(
pSupporter
,
pQuery
,
pRuntimeEnv
);
if
(
flushFromResultBuf
(
pSupporter
,
pQuery
,
pRuntimeEnv
)
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
resetMergeResultBuf
(
pQuery
,
pCtx
);
}
...
...
@@ -4887,7 +4898,14 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
}
if
(
buffer
[
0
]
->
numOfElems
!=
0
)
{
// there are data in buffer
flushFromResultBuf
(
pSupporter
,
pQuery
,
pRuntimeEnv
);
if
(
flushFromResultBuf
(
pSupporter
,
pQuery
,
pRuntimeEnv
)
!=
TSDB_CODE_SUCCESS
)
{
dError
(
"QInfo:%p failed to flush data into temp file, abort query"
,
GET_QINFO_ADDR
(
pQuery
),
pSupporter
->
extBufFile
);
tfree
(
pTree
);
tfree
(
pValidMeter
);
tfree
(
posArray
);
return
-
1
;
}
}
int64_t
endt
=
taosGetTimestampMs
();
...
...
@@ -4906,25 +4924,44 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
return
pSupporter
->
numOfGroupResultPages
;
}
static
void
extendDiskBuf
(
SMeterQuerySupportObj
*
pSupporter
,
int32_t
numOfPages
)
{
static
int32_t
extendDiskBuf
(
const
SQuery
*
pQuery
,
SMeterQuerySupportObj
*
pSupporter
,
int32_t
numOfPages
)
{
assert
(
pSupporter
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
==
pSupporter
->
bufSize
);
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
int32_t
ret
=
munmap
(
pSupporter
->
meterOutputMMapBuf
,
pSupporter
->
bufSize
);
pSupporter
->
numOfPages
=
numOfPages
;
// disk-based output buffer is exhausted, try to extend the disk-based buffer
/*
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
ret
=
ftruncate
(
pSupporter
->
meterOutputFd
,
pSupporter
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
);
if
(
ret
!=
0
)
{
perror
(
"error in allocate the disk-based buffer"
);
return
;
dError
(
"QInfo:%p failed to create intermediate result output file:%s. %s"
,
pQInfo
,
pSupporter
->
extBufFile
,
strerror
(
errno
));
pQInfo
->
code
=
-
TSDB_CODE_SERV_NO_DISKSPACE
;
pQInfo
->
killed
=
1
;
return
pQInfo
->
code
;
}
pSupporter
->
bufSize
=
pSupporter
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
;
pSupporter
->
meterOutputMMapBuf
=
mmap
(
NULL
,
pSupporter
->
bufSize
,
PROT_READ
|
PROT_WRITE
,
MAP_SHARED
,
pSupporter
->
meterOutputFd
,
0
);
if
(
pSupporter
->
meterOutputMMapBuf
==
MAP_FAILED
)
{
dError
(
"QInfo:%p failed to map temp file: %s. %s"
,
pQInfo
,
pSupporter
->
extBufFile
,
strerror
(
errno
));
pQInfo
->
code
=
-
TSDB_CODE_SERV_OUT_OF_MEMORY
;
pQInfo
->
killed
=
1
;
return
pQInfo
->
code
;
}
return
TSDB_CODE_SUCCESS
;
}
void
flushFromResultBuf
(
SMeterQuerySupportObj
*
pSupporter
,
const
SQuery
*
pQuery
,
const
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
int32_t
flushFromResultBuf
(
SMeterQuerySupportObj
*
pSupporter
,
const
SQuery
*
pQuery
,
const
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
int32_t
numOfMeterResultBufPages
=
pSupporter
->
lastPageId
+
1
;
int64_t
dstSize
=
numOfMeterResultBufPages
*
DEFAULT_INTERN_BUF_SIZE
+
pSupporter
->
groupResultSize
*
(
pSupporter
->
numOfGroupResultPages
+
1
);
...
...
@@ -4935,7 +4972,9 @@ void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
requiredPages
+=
pSupporter
->
numOfMeters
;
}
extendDiskBuf
(
pSupporter
,
requiredPages
);
if
(
extendDiskBuf
(
pQuery
,
pSupporter
,
requiredPages
)
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
}
char
*
lastPosition
=
pSupporter
->
meterOutputMMapBuf
+
DEFAULT_INTERN_BUF_SIZE
*
numOfMeterResultBufPages
+
...
...
@@ -4949,6 +4988,7 @@ void flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
}
pSupporter
->
numOfGroupResultPages
+=
1
;
return
TSDB_CODE_SUCCESS
;
}
void
resetMergeResultBuf
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
)
{
...
...
@@ -4966,7 +5006,7 @@ void setMeterDataInfo(SMeterDataInfo *pMeterDataInfo, SMeterObj *pMeterObj, int3
pMeterDataInfo
->
meterOrderIdx
=
meterIdx
;
}
void
doCloseAllOpenedResults
(
SMeterQuerySupportObj
*
pSupporter
)
{
int32_t
doCloseAllOpenedResults
(
SMeterQuerySupportObj
*
pSupporter
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pSupporter
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -4980,11 +5020,20 @@ void doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) {
pRuntimeEnv
->
pMeterObj
=
getMeterObj
(
pSupporter
->
pMeterObj
,
pSupporter
->
pSidSet
->
pSids
[
index
]
->
sid
);
assert
(
pRuntimeEnv
->
pMeterObj
==
pMeterInfo
[
i
].
pMeterObj
);
setIntervalQueryExecutionContext
(
pSupporter
,
i
,
pMeterInfo
[
i
].
pMeterQInfo
);
saveResult
(
pSupporter
,
pMeterInfo
[
i
].
pMeterQInfo
,
pMeterInfo
[
i
].
pMeterQInfo
->
lastResRows
);
int32_t
ret
=
setIntervalQueryExecutionContext
(
pSupporter
,
i
,
pMeterInfo
[
i
].
pMeterQInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
ret
=
saveResult
(
pSupporter
,
pMeterInfo
[
i
].
pMeterQInfo
,
pMeterInfo
[
i
].
pMeterQInfo
->
lastResRows
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
}
}
}
return
TSDB_CODE_SUCCESS
;
}
void
disableFunctForSuppleScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
order
)
{
...
...
@@ -5690,18 +5739,24 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY
}
}
static
tFilePage
*
allocNewPage
(
SMeterQuerySupportObj
*
pSupporter
,
uint32_t
*
pageId
)
{
static
tFilePage
*
allocNewPage
(
S
Query
*
pQuery
,
S
MeterQuerySupportObj
*
pSupporter
,
uint32_t
*
pageId
)
{
if
(
pSupporter
->
lastPageId
==
pSupporter
->
numOfPages
-
1
)
{
extendDiskBuf
(
pSupporter
,
pSupporter
->
numOfPages
+
pSupporter
->
numOfMeters
);
if
(
extendDiskBuf
(
pQuery
,
pSupporter
,
pSupporter
->
numOfPages
+
pSupporter
->
numOfMeters
)
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
}
*
pageId
=
(
++
pSupporter
->
lastPageId
);
return
getFilePage
(
pSupporter
,
*
pageId
);
}
tFilePage
*
addDataPageForMeterQueryInfo
(
SMeterQueryInfo
*
pMeterQueryInfo
,
SMeterQuerySupportObj
*
pSupporter
)
{
tFilePage
*
addDataPageForMeterQueryInfo
(
S
Query
*
pQuery
,
S
MeterQueryInfo
*
pMeterQueryInfo
,
SMeterQuerySupportObj
*
pSupporter
)
{
uint32_t
pageId
=
0
;
tFilePage
*
pPage
=
allocNewPage
(
pSupporter
,
&
pageId
);
tFilePage
*
pPage
=
allocNewPage
(
pQuery
,
pSupporter
,
&
pageId
);
if
(
pPage
==
NULL
)
{
// failed to allocate disk-based buffer for intermediate results
return
NULL
;
}
if
(
pMeterQueryInfo
->
numOfPages
>=
pMeterQueryInfo
->
numOfAlloc
)
{
pMeterQueryInfo
->
numOfAlloc
=
pMeterQueryInfo
->
numOfAlloc
<<
1
;
...
...
@@ -6199,46 +6254,53 @@ void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t
}
}
void
setOutputBufferForIntervalQuery
(
SMeterQuerySupportObj
*
pSupporter
,
SMeterQueryInfo
*
pMeterQueryInfo
)
{
int32_t
setOutputBufferForIntervalQuery
(
SMeterQuerySupportObj
*
pSupporter
,
SMeterQueryInfo
*
pMeterQueryInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pSupporter
->
runtimeEnv
;
tFilePage
*
pData
=
NULL
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// in the first scan, new space needed for results
if
(
pMeterQueryInfo
->
numOfPages
==
0
)
{
pData
=
addDataPageForMeterQueryInfo
(
pMeterQueryInfo
,
pSupporter
);
pData
=
addDataPageForMeterQueryInfo
(
p
Query
,
p
MeterQueryInfo
,
pSupporter
);
}
else
{
int32_t
lastPageId
=
pMeterQueryInfo
->
pageList
[
pMeterQueryInfo
->
numOfPages
-
1
];
pData
=
getFilePage
(
pSupporter
,
lastPageId
);
if
(
pData
->
numOfElems
>=
pRuntimeEnv
->
numOfRowsPerPage
)
{
pData
=
addDataPageForMeterQueryInfo
(
pMeterQueryInfo
,
pSupporter
);
pData
=
addDataPageForMeterQueryInfo
(
pRuntimeEnv
->
pQuery
,
pMeterQueryInfo
,
pSupporter
);
if
(
pData
!=
NULL
)
{
assert
(
pData
->
numOfElems
==
0
);
// number of elements must be 0 for new allocated buffer
}
}
}
if
(
pData
==
NULL
)
{
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
pQuery
->
numOfOutputCols
;
++
i
)
{
pRuntimeEnv
->
pCtx
[
i
].
aOutputBuf
=
getOutputResPos
(
pRuntimeEnv
,
pData
,
pData
->
numOfElems
,
i
);
pRuntimeEnv
->
pCtx
[
i
].
resultInfo
=
&
pMeterQueryInfo
->
resultInfo
[
i
];
}
return
TSDB_CODE_SUCCESS
;
}
void
setIntervalQueryExecutionContext
(
SMeterQuerySupportObj
*
pSupporter
,
int32_t
meterIdx
,
int32_t
setIntervalQueryExecutionContext
(
SMeterQuerySupportObj
*
pSupporter
,
int32_t
meterIdx
,
SMeterQueryInfo
*
pMeterQueryInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pSupporter
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
))
{
setOutputBufferForIntervalQuery
(
pSupporter
,
pMeterQueryInfo
);
if
(
setOutputBufferForIntervalQuery
(
pSupporter
,
pMeterQueryInfo
)
!=
TSDB_CODE_SUCCESS
)
{
// not enough disk space or memory buffer for intermediate results
return
-
1
;
}
if
(
pMeterQueryInfo
->
lastResRows
==
0
)
{
initCtxOutputBuf
(
pRuntimeEnv
);
}
// reset the number of iterated elements, once this function is called. since the pCtx for different
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
// pRuntimeEnv->pCtx[j].numOfIteratedElems = 0;
}
}
else
{
if
(
pMeterQueryInfo
->
reverseFillRes
)
{
setCtxOutputPointerForSupplementScan
(
pSupporter
,
pMeterQueryInfo
);
...
...
@@ -6249,7 +6311,9 @@ void setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t
*
* If the master scan does not produce any results, new spaces needed to be allocated during supplement scan
*/
setOutputBufferForIntervalQuery
(
pSupporter
,
pMeterQueryInfo
);
if
(
setOutputBufferForIntervalQuery
(
pSupporter
,
pMeterQueryInfo
)
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
}
}
...
...
@@ -6659,14 +6723,14 @@ static void validateResultBuf(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo
}
}
void
saveResult
(
SMeterQuerySupportObj
*
pSupporter
,
SMeterQueryInfo
*
pMeterQueryInfo
,
int32_t
numOfResult
)
{
int32_t
saveResult
(
SMeterQuerySupportObj
*
pSupporter
,
SMeterQueryInfo
*
pMeterQueryInfo
,
int32_t
numOfResult
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pSupporter
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// no results generated, do nothing for master scan
if
(
numOfResult
<=
0
)
{
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
))
{
return
;
return
TSDB_CODE_SUCCESS
;
}
else
{
/*
* There is a case that no result generated during the the supplement scan, and during the main
...
...
@@ -6691,7 +6755,7 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI
setCtxOutputPointerForSupplementScan
(
pSupporter
,
pMeterQueryInfo
);
}
return
;
return
TSDB_CODE_SUCCESS
;
}
}
...
...
@@ -6720,7 +6784,9 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI
pMeterQueryInfo
->
numOfRes
+=
numOfResult
;
assert
(
pData
->
numOfElems
<=
pRuntimeEnv
->
numOfRowsPerPage
);
setOutputBufferForIntervalQuery
(
pSupporter
,
pMeterQueryInfo
);
if
(
setOutputBufferForIntervalQuery
(
pSupporter
,
pMeterQueryInfo
)
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutputCols
;
++
i
)
{
resetResultInfo
(
&
pMeterQueryInfo
->
resultInfo
[
i
]);
...
...
@@ -6743,6 +6809,8 @@ void saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryI
tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage);
#endif
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
getSubsetNumber
(
SMeterQuerySupportObj
*
pSupporter
)
{
...
...
src/system/detail/src/vnodeQueryProcess.c
浏览文件 @
30975a74
...
...
@@ -157,7 +157,11 @@ static SMeterDataInfo *queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMe
setExecutionContext
(
pSupporter
,
pSupporter
->
pResult
,
k
,
pMeterInfo
[
k
].
groupIdx
,
pMeterQueryInfo
);
}
else
{
setIntervalQueryExecutionContext
(
pSupporter
,
k
,
pMeterQueryInfo
);
int32_t
ret
=
setIntervalQueryExecutionContext
(
pSupporter
,
k
,
pMeterQueryInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pQInfo
->
killed
=
1
;
return
NULL
;
}
}
qTrace
(
"QInfo:%p vid:%d sid:%d id:%s, query in cache, qrange:%lld-%lld, lastKey:%lld"
,
pQInfo
,
pMeterObj
->
vnode
,
...
...
@@ -306,7 +310,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
if
(
pReqMeterDataInfo
==
NULL
)
{
dError
(
"QInfo:%p failed to allocate memory to perform query processing, abort"
,
pQInfo
);
pQInfo
->
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
pQInfo
->
code
=
-
TSDB_CODE_SERV_OUT_OF_MEMORY
;
pQInfo
->
killed
=
1
;
return
NULL
;
}
...
...
@@ -338,7 +342,7 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
dError
(
"QInfo:%p failed to allocate memory to perform query processing, abort"
,
pQInfo
);
tfree
(
pReqMeterDataInfo
);
pQInfo
->
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
pQInfo
->
code
=
-
TSDB_CODE_SERV_OUT_OF_MEMORY
;
pQInfo
->
killed
=
1
;
return
NULL
;
}
...
...
@@ -393,7 +397,12 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
setExecutionContext
(
pSupporter
,
pSupporter
->
pResult
,
pOneMeterDataInfo
->
meterOrderIdx
,
pOneMeterDataInfo
->
groupIdx
,
pMeterQueryInfo
);
}
else
{
// interval query
setIntervalQueryExecutionContext
(
pSupporter
,
pOneMeterDataInfo
->
meterOrderIdx
,
pMeterQueryInfo
);
int32_t
ret
=
setIntervalQueryExecutionContext
(
pSupporter
,
pOneMeterDataInfo
->
meterOrderIdx
,
pMeterQueryInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
pReqMeterDataInfo
);
// error code has been set
pQInfo
->
killed
=
1
;
return
NULL
;
}
}
SCompBlock
*
pBlock
=
pInfoEx
->
pBlock
.
compBlock
;
...
...
@@ -900,7 +909,12 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
dTrace
(
"QInfo:%p main scan completed, elapsed time: %lldms, supplementary scan start, order:%d"
,
pQInfo
,
et
-
st
,
pQuery
->
order
.
order
^
1
);
doCloseAllOpenedResults
(
pSupporter
);
// failed to save all intermediate results into disk, abort further query processing
if
(
doCloseAllOpenedResults
(
pSupporter
)
!=
TSDB_CODE_SUCCESS
)
{
dError
(
"QInfo:%p failed to save intermediate results, abort further query processing"
,
pQInfo
);
return
;
}
doMultiMeterSupplementaryScan
(
pQInfo
);
if
(
isQueryKilled
(
pQuery
))
{
...
...
@@ -911,12 +925,13 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
if
(
pQuery
->
nAggTimeInterval
>
0
)
{
assert
(
pSupporter
->
subgroupIdx
==
0
&&
pSupporter
->
numOfGroupResultPages
==
0
);
mergeMetersResultToOneGroups
(
pSupporter
);
if
(
mergeMetersResultToOneGroups
(
pSupporter
)
==
TSDB_CODE_SUCCESS
)
{
copyResToQueryResultBuf
(
pSupporter
,
pQuery
);
#ifdef _DEBUG_VIEW
displayInterResult
(
pQuery
->
sdata
,
pQuery
,
pQuery
->
sdata
[
0
]
->
len
);
#endif
}
}
else
{
// not a interval query
copyFromGroupBuf
(
pQInfo
,
pSupporter
->
pResult
);
}
...
...
src/system/detail/src/vnodeRead.c
浏览文件 @
30975a74
...
...
@@ -824,11 +824,11 @@ int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t *
}
if
(
pQInfo
->
killed
)
{
dTrace
(
"QInfo:%p
it is already
killed, %p, code:%d"
,
pQInfo
,
pQuery
,
pQInfo
->
code
);
dTrace
(
"QInfo:%p
query is
killed, %p, code:%d"
,
pQInfo
,
pQuery
,
pQInfo
->
code
);
if
(
pQInfo
->
code
==
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_QUERY_CANCELLED
;
}
else
{
// in case of not TSDB_CODE_SUCCESS, return the code to client
return
pQInfo
->
code
;
return
abs
(
pQInfo
->
code
)
;
}
}
...
...
@@ -838,7 +838,12 @@ int vnodeRetrieveQueryInfo(void *handle, int *numOfRows, int *rowSize, int16_t *
*
timePrec
=
vnodeList
[
pQInfo
->
pObj
->
vnode
].
cfg
.
precision
;
if
(
pQInfo
->
code
<
0
)
return
-
pQInfo
->
code
;
dTrace
(
"QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d"
,
pQInfo
,
*
timePrec
,
*
rowSize
,
*
numOfRows
,
pQInfo
->
code
);
if
(
pQInfo
->
code
<
0
)
{
// less than 0 means there are error existed.
return
-
pQInfo
->
code
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
src/system/detail/src/vnodeShell.c
浏览文件 @
30975a74
...
...
@@ -606,7 +606,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if
(
tsAvailDataDirGB
<
tsMinimalDataDirGB
)
{
dError
(
"server disk space remain %.3f GB, need at least %.3f GB, stop writing"
,
tsAvailDataDirGB
,
tsMinimalDataDirGB
);
code
=
TSDB_CODE_SERV
ER_NO_
SPACE
;
code
=
TSDB_CODE_SERV
_NO_DISK
SPACE
;
goto
_submit_over
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录