Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
afa86981
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
afa86981
编写于
8月 06, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1057
上级
8434058a
变更
15
显示空白变更内容
内联
并排
Showing
15 changed file
with
148 addition
and
120 deletion
+148
-120
deps/pthread/config.h
deps/pthread/config.h
+1
-0
src/client/CMakeLists.txt
src/client/CMakeLists.txt
+2
-2
src/client/src/taos.def
src/client/src/taos.def
+13
-1
src/query/CMakeLists.txt
src/query/CMakeLists.txt
+7
-6
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+1
-1
src/query/src/qAst.c
src/query/src/qAst.c
+2
-2
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+78
-64
src/query/src/qExtbuffer.c
src/query/src/qExtbuffer.c
+5
-5
src/query/src/qFill.c
src/query/src/qFill.c
+11
-11
src/query/src/qHistogram.c
src/query/src/qHistogram.c
+3
-3
src/query/src/qParserImpl.c
src/query/src/qParserImpl.c
+3
-3
src/query/src/qPercentile.c
src/query/src/qPercentile.c
+7
-7
src/query/src/qResultbuf.c
src/query/src/qResultbuf.c
+6
-6
src/query/src/qTokenizer.c
src/query/src/qTokenizer.c
+2
-2
src/query/src/qTsbuf.c
src/query/src/qTsbuf.c
+7
-7
未找到文件。
deps/pthread/config.h
浏览文件 @
afa86981
...
...
@@ -8,6 +8,7 @@
*********************************************************************/
/* We're building the pthreads-win32 library */
#undef PTW32_BUILD
#define PTW32_BUILD
/* Do we know about the C type sigset_t? */
...
...
src/client/CMakeLists.txt
浏览文件 @
afa86981
...
...
@@ -37,14 +37,14 @@ ELSEIF (TD_WINDOWS)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/jni/windows/win32
)
ADD_LIBRARY
(
taos_static STATIC
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
taos_static trpc tutil
)
TARGET_LINK_LIBRARIES
(
taos_static trpc tutil
query
)
# generate dynamic library (*.dll)
ADD_LIBRARY
(
taos SHARED
${
SRC
}
)
IF
(
NOT TD_GODLL
)
SET_TARGET_PROPERTIES
(
taos PROPERTIES LINK_FLAGS /DEF:
${
TD_COMMUNITY_DIR
}
/src/client/src/taos.def
)
ENDIF
()
TARGET_LINK_LIBRARIES
(
taos trpc tutil
)
TARGET_LINK_LIBRARIES
(
taos trpc tutil
query
)
ELSEIF
(
TD_DARWIN
)
SET
(
CMAKE_MACOSX_RPATH 1
)
...
...
src/client/src/taos.def
浏览文件 @
afa86981
EXPORTS
taos_init
taos_cleanup
taos_options
taos_connect
taos_close
taos_stmt_init
taos_stmt_prepare
taos_stmt_bind_param
taos_stmt_add_batch
taos_stmt_execute
taos_stmt_use_result
taos_stmt_close
taos_query
taos_fetch_row
taos_result_precision
taos_free_result
taos_field_count
taos_num_fields
...
...
@@ -13,6 +22,9 @@ taos_fetch_fields
taos_select_db
taos_print_row
taos_stop_query
taos_fetch_block
taos_validate_sql
taos_fetch_lengths
taos_get_server_info
taos_get_client_info
taos_errstr
...
...
@@ -26,5 +38,5 @@ taos_unsubscribe
taos_open_stream
taos_close_stream
taos_fetch_block
taos_
result_precision
taos_
load_table_info
src/query/CMakeLists.txt
浏览文件 @
afa86981
...
...
@@ -5,12 +5,13 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/client/inc
)
INCLUDE_DIRECTORIES
(
inc
)
AUX_SOURCE_DIRECTORY
(
src SRC
)
ADD_LIBRARY
(
query
${
SRC
}
)
SET_SOURCE_FILES_PROPERTIES
(
src/sql.c PROPERTIES COMPILE_FLAGS -w
)
IF
(
TD_LINUX
)
AUX_SOURCE_DIRECTORY
(
src SRC
)
ADD_LIBRARY
(
query
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
query tsdb tutil m rt
)
TARGET_LINK_LIBRARIES
(
query tutil m rt
)
ADD_SUBDIRECTORY
(
tests
)
SET_SOURCE_FILES_PROPERTIES
(
src/sql.c PROPERTIES COMPILE_FLAGS -w
)
ELSEIF
(
TD_WINDOWS
)
TARGET_LINK_LIBRARIES
(
query tutil
)
ENDIF
()
src/query/inc/qUtil.h
浏览文件 @
afa86981
...
...
@@ -52,7 +52,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pos.pageId);
int32_t
realRowId
=
pResult
->
pos
.
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
);
int32_t
realRowId
=
(
int32_t
)(
pResult
->
pos
.
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
)
);
return
((
char
*
)
page
->
data
)
+
pRuntimeEnv
->
offset
[
columnIndex
]
*
pRuntimeEnv
->
numOfRowsPerPage
+
pQuery
->
pSelectExpr
[
columnIndex
].
bytes
*
realRowId
;
}
...
...
src/query/src/qAst.c
浏览文件 @
afa86981
...
...
@@ -1044,7 +1044,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
}
else
if
(
*
e
==
','
)
{
size_t
len
=
e
-
cond
;
char
*
p
=
exception_malloc
(
len
+
VARSTR_HEADER_SIZE
);
STR_WITH_SIZE_TO_VARSTR
(
p
,
cond
,
len
);
STR_WITH_SIZE_TO_VARSTR
(
p
,
cond
,
(
VarDataLenT
)
len
);
cond
+=
len
;
taosArrayPush
(
pVal
->
arr
,
&
p
);
}
...
...
@@ -1054,7 +1054,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
size_t
len
=
strlen
(
cond
)
+
VARSTR_HEADER_SIZE
;
char
*
p
=
exception_malloc
(
len
);
STR_WITH_SIZE_TO_VARSTR
(
p
,
cond
,
len
-
VARSTR_HEADER_SIZE
);
STR_WITH_SIZE_TO_VARSTR
(
p
,
cond
,
(
VarDataLenT
)(
len
-
VARSTR_HEADER_SIZE
)
);
taosArrayPush
(
pVal
->
arr
,
&
p
);
}
...
...
src/query/src/qExecutor.c
浏览文件 @
afa86981
...
...
@@ -49,6 +49,11 @@
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
#define TIME_WINDOW_COPY(_dst, _src) do {\
_dst.skey = _src.skey;\
_dst.ekey = _src.ekey;\
} while (0);
enum
{
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED
=
0x1u
,
...
...
@@ -149,7 +154,7 @@ bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfFilterCols
;
++
k
)
{
SSingleColumnFilterInfo
*
pFilterInfo
=
&
pQuery
->
pFilterInfo
[
k
];
char
*
pElem
=
pFilterInfo
->
pData
+
pFilterInfo
->
info
.
bytes
*
elemPos
;
char
*
pElem
=
(
char
*
)
pFilterInfo
->
pData
+
pFilterInfo
->
info
.
bytes
*
elemPos
;
if
(
isNull
(
pElem
,
pFilterInfo
->
info
.
type
))
{
return
false
;
}
...
...
@@ -395,7 +400,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
// more than the capacity, reallocate the resources
if
(
pWindowResInfo
->
size
>=
pWindowResInfo
->
capacity
)
{
int64_t
newCap
=
pWindowResInfo
->
capacity
*
1
.
5
;
int64_t
newCap
=
(
int64_t
)(
pWindowResInfo
->
capacity
*
1
.
5
f
)
;
char
*
t
=
realloc
(
pWindowResInfo
->
pResult
,
newCap
*
sizeof
(
SWindowResult
));
if
(
t
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -403,14 +408,14 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
pWindowResInfo
->
pResult
=
(
SWindowResult
*
)
t
;
int32_t
inc
=
newCap
-
pWindowResInfo
->
capacity
;
int32_t
inc
=
(
int32_t
)
newCap
-
pWindowResInfo
->
capacity
;
memset
(
&
pWindowResInfo
->
pResult
[
pWindowResInfo
->
capacity
],
0
,
sizeof
(
SWindowResult
)
*
inc
);
for
(
int32_t
i
=
pWindowResInfo
->
capacity
;
i
<
newCap
;
++
i
)
{
createQueryResultInfo
(
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
pRuntimeEnv
->
interBufSize
);
}
pWindowResInfo
->
capacity
=
newCap
;
pWindowResInfo
->
capacity
=
(
int32_t
)
newCap
;
}
// add a new result set for a new group
...
...
@@ -499,7 +504,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
// set the number of rows in current disk page
if
(
pWindowRes
->
pos
.
pageId
==
-
1
)
{
// not allocated yet, allocate new buffer
pWindowRes
->
pos
.
pageId
=
pageId
;
pWindowRes
->
pos
.
rowId
=
pData
->
num
++
;
pWindowRes
->
pos
.
rowId
=
(
int32_t
)(
pData
->
num
++
)
;
assert
(
pWindowRes
->
pos
.
pageId
>=
0
);
}
...
...
@@ -803,7 +808,7 @@ static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow)
//todo binary search
static
void
*
getDataBlockImpl
(
SArray
*
pDataBlock
,
int32_t
colId
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
);
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
,
i
);
...
...
@@ -847,7 +852,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
}
// here the pQuery->colList and sas->colList are identical
int32_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
);
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
SColumnInfo
*
pColMsg
=
&
pQuery
->
colList
[
i
];
...
...
@@ -1065,7 +1070,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
* stage, the remain tables may not have the required column in cache actually. So, the validation of required
* column in cache with the corresponding schema is reinforced.
*/
int32_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
);
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
,
i
);
...
...
@@ -1089,7 +1094,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
return
TS_JOIN_TAG_NOT_EQUALS
;
}
TSKEY
key
=
*
(
TSKEY
*
)(
pCtx
[
0
].
aInputElemBuf
+
TSDB_KEYSIZE
*
offset
);
TSKEY
key
=
*
(
TSKEY
*
)(
(
char
*
)
pCtx
[
0
].
aInputElemBuf
+
TSDB_KEYSIZE
*
offset
);
#if defined(_DEBUG_VIEW)
printf
(
"elem in comp ts file:%"
PRId64
", key:%"
PRId64
", tag:%"
PRIu64
", query order:%d, ts order:%d, traverse:%d, index:%d
\n
"
,
...
...
@@ -1328,7 +1333,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
numOfRes
=
doCheckQueryCompleted
(
pRuntimeEnv
,
lastKey
,
pWindowResInfo
);
}
else
{
numOfRes
=
getNumOfResult
(
pRuntimeEnv
);
numOfRes
=
(
int32_t
)
getNumOfResult
(
pRuntimeEnv
);
// update the number of output result
if
(
numOfRes
>
0
&&
pQuery
->
checkBuffer
==
1
)
{
...
...
@@ -1406,7 +1411,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
}
}
else
if
(
functionId
==
TSDB_FUNC_INTERP
)
{
SInterpInfoDetail
*
pInterpInfo
=
GET_RES_INFO
(
pCtx
)
->
interResultBuf
;
pInterpInfo
->
type
=
pQuery
->
fillType
;
pInterpInfo
->
type
=
(
int8_t
)
pQuery
->
fillType
;
pInterpInfo
->
ts
=
pQuery
->
window
.
skey
;
pInterpInfo
->
primaryCol
=
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
...
...
@@ -1705,7 +1710,7 @@ static bool needReverseScan(SQuery *pQuery) {
}
if
(
functionId
==
TSDB_FUNC_LAST
||
functionId
==
TSDB_FUNC_LAST_DST
)
{
int32_t
order
=
pQuery
->
pSelectExpr
[
i
].
base
.
arg
->
argValue
.
i64
;
int32_t
order
=
(
int32_t
)
pQuery
->
pSelectExpr
[
i
].
base
.
arg
->
argValue
.
i64
;
return
order
!=
pQuery
->
order
.
order
;
}
}
...
...
@@ -1914,7 +1919,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
num
=
128
;
}
else
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
// time window query, allocate one page for each table
size_t
s
=
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
;
num
=
MAX
(
s
,
INITIAL_RESULT_ROWS_VALUE
);
num
=
(
int32_t
)(
MAX
(
s
,
INITIAL_RESULT_ROWS_VALUE
)
);
}
else
{
// for super table query, one page for each subset
num
=
1
;
// pQInfo->pSidSet->numOfSubSet;
}
...
...
@@ -1926,7 +1931,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
static
void
getIntermediateBufInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
*
ps
,
int32_t
*
rowsize
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
*
rowsize
=
pQuery
->
rowSize
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
);
*
rowsize
=
(
int32_t
)(
pQuery
->
rowSize
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
)
);
int32_t
overhead
=
sizeof
(
tFilePage
);
// one page contains at least two rows
...
...
@@ -1977,8 +1982,8 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
SDataStatis
*
pDataBlockst
=
&
pDataStatis
[
index
];
if
(
pFilterInfo
->
info
.
type
==
TSDB_DATA_TYPE_FLOAT
)
{
float
minval
=
*
(
double
*
)(
&
pDataBlockst
->
min
);
float
maxval
=
*
(
double
*
)(
&
pDataBlockst
->
max
);
float
minval
=
(
float
)(
*
(
double
*
)(
&
pDataBlockst
->
min
)
);
float
maxval
=
(
float
)(
*
(
double
*
)(
&
pDataBlockst
->
max
)
);
for
(
int32_t
i
=
0
;
i
<
pFilterInfo
->
numOfFilters
;
++
i
)
{
if
(
pFilterInfo
->
pFilters
[
i
].
fp
(
&
pFilterInfo
->
pFilters
[
i
],
(
char
*
)
&
minval
,
(
char
*
)
&
maxval
))
{
...
...
@@ -2219,8 +2224,8 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
SResultRec
*
pRec
=
&
pQuery
->
rec
;
if
(
pQuery
->
rec
.
capacity
-
pQuery
->
rec
.
rows
<
pBlockInfo
->
rows
)
{
int32_t
remain
=
pRec
->
capacity
-
pRec
->
rows
;
int32_t
newSize
=
pRec
->
capacity
+
(
pBlockInfo
->
rows
-
remain
);
int32_t
remain
=
(
int32_t
)(
pRec
->
capacity
-
pRec
->
rows
)
;
int32_t
newSize
=
(
int32_t
)(
pRec
->
capacity
+
(
pBlockInfo
->
rows
-
remain
)
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
bytes
=
pQuery
->
pSelectExpr
[
i
].
bytes
;
...
...
@@ -2399,7 +2404,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
if
(
pQuery
->
numOfOutput
==
1
&&
pExprInfo
->
base
.
functionId
==
TSDB_FUNC_TS_COMP
)
{
assert
(
pExprInfo
->
base
.
numOfParams
==
1
);
int16_t
tagColId
=
pExprInfo
->
base
.
arg
->
argValue
.
i64
;
int16_t
tagColId
=
(
int16_t
)
pExprInfo
->
base
.
arg
->
argValue
.
i64
;
SColumnInfo
*
pColInfo
=
doGetTagColumnInfoById
(
pQuery
->
tagColList
,
pQuery
->
numOfTags
,
tagColId
);
doSetTagValueInParam
(
tsdb
,
pTable
,
tagColId
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
,
pColInfo
->
type
,
pColInfo
->
bytes
);
...
...
@@ -2424,7 +2429,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
pFuncMsg
->
colInfo
.
colIndex
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
assert
(
pFuncMsg
->
numOfParams
==
1
);
int16_t
tagColId
=
pExprInfo
->
base
.
arg
->
argValue
.
i64
;
int16_t
tagColId
=
(
int16_t
)
pExprInfo
->
base
.
arg
->
argValue
.
i64
;
SColumnInfo
*
pColInfo
=
doGetTagColumnInfoById
(
pQuery
->
tagColList
,
pQuery
->
numOfTags
,
tagColId
);
doSetTagValueInParam
(
tsdb
,
pTable
,
tagColId
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
,
pColInfo
->
type
,
pColInfo
->
bytes
);
...
...
@@ -2632,7 +2637,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int64_t
st
=
taosGetTimestampMs
();
int32_t
ret
=
TSDB_CODE_SUCCESS
;
int32_t
numOfGroups
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
int32_t
numOfGroups
=
(
int32_t
)(
GET_NUM_OF_TABLEGROUP
(
pQInfo
)
);
while
(
pQInfo
->
groupIndex
<
numOfGroups
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
pQInfo
->
groupIndex
);
...
...
@@ -2668,9 +2673,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
// check if all results has been sent to client
int32_t
numOfGroup
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
int32_t
numOfGroup
=
(
int32_t
)(
GET_NUM_OF_TABLEGROUP
(
pQInfo
)
);
if
(
pQInfo
->
numOfGroupResultPages
==
0
&&
pQInfo
->
groupIndex
==
numOfGroup
)
{
pQInfo
->
tableIndex
=
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
;
// set query completed
pQInfo
->
tableIndex
=
(
int32_t
)
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
;
// set query completed
return
;
}
}
...
...
@@ -2681,7 +2686,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
int32_t
id
=
getGroupResultId
(
pQInfo
->
groupIndex
-
1
);
SIDList
list
=
getDataBufPagesIdList
(
pResultBuf
,
pQInfo
->
offset
+
id
);
int32_t
size
=
taosArrayGetSize
(
list
);
int32_t
size
=
(
int32_t
)(
taosArrayGetSize
(
list
)
);
int32_t
offset
=
0
;
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
...
...
@@ -2695,7 +2700,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
}
// rows += pData->num;
offset
+=
pData
->
num
;
offset
+=
(
int32_t
)
pData
->
num
;
}
assert
(
pQuery
->
rec
.
rows
==
0
);
...
...
@@ -2880,7 +2885,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
int32_t
pageId
=
-
1
;
int32_t
capacity
=
pResultBuf
->
numOfRowsPerPage
;
int32_t
remain
=
pQuery
->
sdata
[
0
]
->
num
;
int32_t
remain
=
(
int32_t
)
pQuery
->
sdata
[
0
]
->
num
;
int32_t
offset
=
0
;
while
(
remain
>
0
)
{
...
...
@@ -2999,7 +3004,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
}
}
int32_t
numOfGroups
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
int32_t
numOfGroups
=
(
int32_t
)(
GET_NUM_OF_TABLEGROUP
(
pQInfo
)
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
i
);
...
...
@@ -3084,7 +3089,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
*
* diff function is handled in multi-output function
*/
pRuntimeEnv
->
pCtx
[
j
].
ptsOutputBuf
+=
TSDB_KEYSIZE
*
output
;
pRuntimeEnv
->
pCtx
[
j
].
ptsOutputBuf
=
(
char
*
)
pRuntimeEnv
->
pCtx
[
j
].
ptsOutputBuf
+
TSDB_KEYSIZE
*
output
;
}
RESET_RESULT_INFO
(
pRuntimeEnv
->
pCtx
[
j
].
resultInfo
);
...
...
@@ -3144,7 +3149,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
updateNumOfResult
(
pRuntimeEnv
,
pQuery
->
rec
.
rows
);
updateNumOfResult
(
pRuntimeEnv
,
(
int32_t
)
pQuery
->
rec
.
rows
);
}
}
...
...
@@ -3214,10 +3219,13 @@ static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv *pRuntimeEnv, TSKEY
.
status
=
pQuery
->
status
,
.
windowIndex
=
pRuntimeEnv
->
windowResInfo
.
curIndex
,
.
lastKey
=
start
,
.
w
=
pQuery
->
window
,
.
curWindow
=
{.
skey
=
start
,
.
ekey
=
pTableQueryInfo
->
win
.
ekey
},
};
TIME_WINDOW_COPY
(
info
.
w
,
pQuery
->
window
);
TIME_WINDOW_COPY
(
info
.
curWindow
,
pTableQueryInfo
->
win
);
info
.
curWindow
.
skey
=
start
;
return
info
;
}
...
...
@@ -3246,12 +3254,13 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
SET_REVERSE_SCAN_FLAG
(
pRuntimeEnv
);
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
TIME_WINDOW_COPY
(
cond
.
twindow
,
pQuery
->
window
);
// clean unused handle
if
(
pRuntimeEnv
->
pSecQueryHandle
!=
NULL
)
{
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pSecQueryHandle
);
...
...
@@ -3327,12 +3336,13 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
}
STsdbQueryCond
cond
=
{
.
twindow
=
qstatus
.
curWindow
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
TIME_WINDOW_COPY
(
cond
.
twindow
,
qstatus
.
curWindow
);
if
(
pRuntimeEnv
->
pSecQueryHandle
!=
NULL
)
{
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pSecQueryHandle
);
}
...
...
@@ -3395,7 +3405,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
* the top and bottom query
*/
buf
->
numOfRows
=
getNumOfResult
(
pRuntimeEnv
);
buf
->
numOfRows
=
(
uint16_t
)
getNumOfResult
(
pRuntimeEnv
);
}
}
else
{
...
...
@@ -3697,7 +3707,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
* to SQuery object's result buffer
*/
if
(
numOfRowsToCopy
>
pQuery
->
rec
.
capacity
-
numOfResult
)
{
numOfRowsToCopy
=
pQuery
->
rec
.
capacity
-
numOfResult
;
numOfRowsToCopy
=
(
int32_t
)
pQuery
->
rec
.
capacity
-
numOfResult
;
pQInfo
->
offset
+=
numOfRowsToCopy
;
}
else
{
pQInfo
->
offset
=
0
;
...
...
@@ -3811,7 +3821,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
* first result row in the actual result set will fill nothing.
*/
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
int32_t
numOfTotal
=
getFilledNumOfRes
(
pFillInfo
,
pQuery
->
window
.
ekey
,
pQuery
->
rec
.
capacity
);
int32_t
numOfTotal
=
(
int32_t
)
getFilledNumOfRes
(
pFillInfo
,
pQuery
->
window
.
ekey
,
(
int32_t
)
pQuery
->
rec
.
capacity
);
return
numOfTotal
>
0
;
}
...
...
@@ -3869,7 +3879,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
SFillInfo
*
pFillInfo
=
pRuntimeEnv
->
pFillInfo
;
while
(
1
)
{
int32_t
ret
=
taosGenerateDataBlock
(
pFillInfo
,
(
tFilePage
**
)
pQuery
->
sdata
,
pQuery
->
rec
.
capacity
);
int32_t
ret
=
(
int32_t
)
taosGenerateDataBlock
(
pFillInfo
,
(
tFilePage
**
)
pQuery
->
sdata
,
(
int32_t
)
pQuery
->
rec
.
capacity
);
// todo apply limit output function
/* reached the start position of according to offset value, return immediately */
...
...
@@ -3882,7 +3892,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
qDebug
(
"QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%"
PRId64
". Discard due to offset, remain:%"
PRId64
", new offset:%d"
,
pQInfo
,
pFillInfo
->
numOfRows
,
ret
,
pQuery
->
limit
.
offset
,
ret
-
pQuery
->
limit
.
offset
,
0
);
ret
-=
pQuery
->
limit
.
offset
;
ret
-=
(
int32_t
)
pQuery
->
limit
.
offset
;
// todo !!!!there exactly number of interpo is not valid.
// todo refactor move to the beginning of buffer
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
...
...
@@ -3931,9 +3941,9 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
}
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
pQuery
->
pos
=
pQuery
->
limit
.
offset
;
pQuery
->
pos
=
(
int32_t
)
pQuery
->
limit
.
offset
;
}
else
{
pQuery
->
pos
=
pBlockInfo
->
rows
-
pQuery
->
limit
.
offset
-
1
;
pQuery
->
pos
=
pBlockInfo
->
rows
-
(
int32_t
)
pQuery
->
limit
.
offset
-
1
;
}
assert
(
pQuery
->
pos
>=
0
&&
pQuery
->
pos
<=
pBlockInfo
->
rows
-
1
);
...
...
@@ -4138,12 +4148,13 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
}
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
TIME_WINDOW_COPY
(
cond
.
twindow
,
pQuery
->
window
);
if
(
!
isSTableQuery
&&
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
)
&&
(
cond
.
order
==
TSDB_ORDER_ASC
)
...
...
@@ -4177,7 +4188,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
SExprInfo
*
pExprInfo
=
&
pQuery
->
pSelectExpr
[
i
];
pFillCol
[
i
].
col
.
bytes
=
pExprInfo
->
bytes
;
pFillCol
[
i
].
col
.
type
=
pExprInfo
->
type
;
pFillCol
[
i
].
col
.
type
=
(
int8_t
)
pExprInfo
->
type
;
pFillCol
[
i
].
col
.
offset
=
offset
;
pFillCol
[
i
].
flag
=
TSDB_COL_NORMAL
;
// always be ta normal column for table query
pFillCol
[
i
].
functionId
=
pExprInfo
->
base
.
functionId
;
...
...
@@ -4248,7 +4259,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
threshold
=
4000
;
}
else
{
type
=
TSDB_DATA_TYPE_INT
;
// group id
threshold
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
threshold
=
(
int32_t
)(
GET_NUM_OF_TABLEGROUP
(
pQInfo
)
);
if
(
threshold
<
8
)
{
threshold
=
8
;
}
...
...
@@ -4288,8 +4299,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
TSKEY
ek
=
MAX
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
);
getAlignQueryTimeWindow
(
pQuery
,
pQuery
->
window
.
skey
,
sk
,
ek
,
&
w
);
pRuntimeEnv
->
pFillInfo
=
taosInitFillInfo
(
pQuery
->
order
.
order
,
w
.
skey
,
0
,
pQuery
->
rec
.
capacity
,
pQuery
->
numOfOutput
,
pQuery
->
slidingTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
,
pRuntimeEnv
->
pFillInfo
=
taosInitFillInfo
(
pQuery
->
order
.
order
,
w
.
skey
,
0
,
(
int32_t
)
pQuery
->
rec
.
capacity
,
pQuery
->
numOfOutput
,
pQuery
->
slidingTime
,
pQuery
->
slidingTimeUnit
,
(
int8_t
)
pQuery
->
precision
,
pQuery
->
fillType
,
pColInfo
);
}
...
...
@@ -4471,12 +4482,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
numOfGroups
,
group
);
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
colList
=
pQuery
->
colList
,
.
order
=
pQuery
->
order
.
order
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
TIME_WINDOW_COPY
(
cond
.
twindow
,
pQuery
->
window
);
SArray
*
g1
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
tx
=
taosArrayClone
(
group
);
taosArrayPush
(
g1
,
&
tx
);
...
...
@@ -4543,12 +4555,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
qDebug
(
"QInfo:%p group by normal columns group:%d, total group:%zu"
,
pQInfo
,
pQInfo
->
groupIndex
,
numOfGroups
);
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
colList
=
pQuery
->
colList
,
.
order
=
pQuery
->
order
.
order
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
TIME_WINDOW_COPY
(
cond
.
twindow
,
pQuery
->
window
);
SArray
*
g1
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
tx
=
taosArrayClone
(
group
);
taosArrayPush
(
g1
,
&
tx
);
...
...
@@ -4662,7 +4675,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// the limitation of output result is reached, set the query completed
if
(
limitResults
(
pRuntimeEnv
))
{
pQInfo
->
tableIndex
=
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
;
pQInfo
->
tableIndex
=
(
int32_t
)
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
;
break
;
}
...
...
@@ -4748,12 +4761,13 @@ static void doSaveContext(SQInfo *pQInfo) {
}
STsdbQueryCond
cond
=
{
.
twindow
=
pQuery
->
window
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
numOfCols
=
pQuery
->
numOfCols
,
};
TIME_WINDOW_COPY
(
cond
.
twindow
,
pQuery
->
window
);
// clean unused handle
if
(
pRuntimeEnv
->
pSecQueryHandle
!=
NULL
)
{
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pSecQueryHandle
);
...
...
@@ -4985,7 +4999,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start)
// maxOutput <= 0, means current query does not generate any results
int32_t
numOfClosed
=
numOfClosedTimeWindow
(
&
pRuntimeEnv
->
windowResInfo
);
int32_t
c
=
MIN
(
numOfClosed
,
pQuery
->
limit
.
offset
);
int32_t
c
=
(
int32_t
)(
MIN
(
numOfClosed
,
pQuery
->
limit
.
offset
)
);
clearFirstNTimeWindow
(
pRuntimeEnv
,
c
);
pQuery
->
limit
.
offset
-=
c
;
}
...
...
@@ -5029,7 +5043,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
limitResults
(
pRuntimeEnv
);
break
;
}
else
{
taosFillSetStartInfo
(
pRuntimeEnv
->
pFillInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
window
.
ekey
);
taosFillSetStartInfo
(
pRuntimeEnv
->
pFillInfo
,
(
int32_t
)
pQuery
->
rec
.
rows
,
pQuery
->
window
.
ekey
);
taosFillCopyInputDataFromFilePage
(
pRuntimeEnv
->
pFillInfo
,
(
tFilePage
**
)
pQuery
->
sdata
);
numOfFilled
=
0
;
...
...
@@ -5536,7 +5550,7 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
}
}
int32_t
param
=
pExprs
[
i
].
base
.
arg
[
0
].
argValue
.
i64
;
int32_t
param
=
(
int32_t
)
pExprs
[
i
].
base
.
arg
[
0
].
argValue
.
i64
;
if
(
getResultDataInfo
(
type
,
bytes
,
pExprs
[
i
].
base
.
functionId
,
param
,
&
pExprs
[
i
].
type
,
&
pExprs
[
i
].
bytes
,
&
pExprs
[
i
].
interBytes
,
0
,
isSuperTable
)
!=
TSDB_CODE_SUCCESS
)
{
taosTFree
(
pExprs
);
...
...
@@ -5561,7 +5575,7 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
SColumnInfo
*
pCol
=
&
pQueryMsg
->
colList
[
j
];
int32_t
ret
=
getResultDataInfo
(
pCol
->
type
,
pCol
->
bytes
,
functId
,
pExprs
[
i
].
base
.
arg
[
0
].
argValue
.
i64
,
getResultDataInfo
(
pCol
->
type
,
pCol
->
bytes
,
functId
,
(
int32_t
)
pExprs
[
i
].
base
.
arg
[
0
].
argValue
.
i64
,
&
pExprs
[
i
].
type
,
&
pExprs
[
i
].
bytes
,
&
pExprs
[
i
].
interBytes
,
tagLen
,
isSuperTable
);
assert
(
ret
==
TSDB_CODE_SUCCESS
);
}
...
...
@@ -5730,7 +5744,7 @@ static void freeQInfo(SQInfo *pQInfo);
static
void
calResultBufSize
(
SQuery
*
pQuery
)
{
const
int32_t
RESULT_MSG_MIN_SIZE
=
1024
*
(
1024
+
512
);
// bytes
const
int32_t
RESULT_MSG_MIN_ROWS
=
8192
;
const
float
RESULT_THRESHOLD_RATIO
=
0
.
85
;
const
float
RESULT_THRESHOLD_RATIO
=
0
.
85
f
;
if
(
isProjQuery
(
pQuery
))
{
int32_t
numOfRes
=
RESULT_MSG_MIN_SIZE
/
pQuery
->
rowSize
;
...
...
@@ -5739,10 +5753,10 @@ static void calResultBufSize(SQuery* pQuery) {
}
pQuery
->
rec
.
capacity
=
numOfRes
;
pQuery
->
rec
.
threshold
=
numOfRes
*
RESULT_THRESHOLD_RATIO
;
pQuery
->
rec
.
threshold
=
(
int32_t
)(
numOfRes
*
RESULT_THRESHOLD_RATIO
)
;
}
else
{
// in case of non-prj query, a smaller output buffer will be used.
pQuery
->
rec
.
capacity
=
4096
;
pQuery
->
rec
.
threshold
=
pQuery
->
rec
.
capacity
*
RESULT_THRESHOLD_RATIO
;
pQuery
->
rec
.
threshold
=
(
int32_t
)(
pQuery
->
rec
.
capacity
*
RESULT_THRESHOLD_RATIO
)
;
}
}
...
...
@@ -5871,7 +5885,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
window
.
skey
=
pQueryMsg
->
window
.
skey
;
}
void
*
buf
=
pQInfo
->
pBuf
+
index
*
sizeof
(
STableQueryInfo
);
void
*
buf
=
(
char
*
)
pQInfo
->
pBuf
+
index
*
sizeof
(
STableQueryInfo
);
STableQueryInfo
*
item
=
createTableQueryInfo
(
&
pQInfo
->
runtimeEnv
,
pTable
,
window
,
buf
);
if
(
item
==
NULL
)
{
goto
_cleanup
;
...
...
@@ -6026,7 +6040,7 @@ static void freeQInfo(SQInfo *pQInfo) {
// todo refactor, extract method to destroytableDataInfo
if
(
pQInfo
->
tableqinfoGroupInfo
.
pGroupList
!=
NULL
)
{
int32_t
numOfGroups
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
int32_t
numOfGroups
=
(
int32_t
)(
GET_NUM_OF_TABLEGROUP
(
pQInfo
)
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
p
=
GET_TABLEGROUP
(
pQInfo
,
i
);
...
...
@@ -6129,7 +6143,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
setQueryStatus
(
pQuery
,
QUERY_OVER
);
}
}
else
{
doCopyQueryResultToMsg
(
pQInfo
,
pQuery
->
rec
.
rows
,
data
);
doCopyQueryResultToMsg
(
pQInfo
,
(
int32_t
)
pQuery
->
rec
.
rows
,
data
);
}
pQuery
->
rec
.
total
+=
pQuery
->
rec
.
rows
;
...
...
@@ -6423,7 +6437,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
size
+=
sizeof
(
int32_t
);
size
+=
sizeof
(
STableIdInfo
)
*
taosArrayGetSize
(
pQInfo
->
arrTableIdInfo
);
*
contLen
=
size
+
sizeof
(
SRetrieveTableRsp
);
*
contLen
=
(
int32_t
)(
size
+
sizeof
(
SRetrieveTableRsp
)
);
// todo proper handle failed to allocate memory,
// current solution only avoid crash, but cannot return error code to client
...
...
@@ -6432,7 +6446,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
(
*
pRsp
)
->
numOfRows
=
htonl
(
pQuery
->
rec
.
rows
);
(
*
pRsp
)
->
numOfRows
=
htonl
(
(
int32_t
)
pQuery
->
rec
.
rows
);
int32_t
code
=
pQInfo
->
code
;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -6567,15 +6581,15 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
*
(
int64_t
*
)
pQuery
->
sdata
[
0
]
->
data
=
num
;
count
=
1
;
pQInfo
->
tableIndex
=
num
;
//set query completed
pQInfo
->
tableIndex
=
(
int32_t
)
num
;
//set query completed
qDebug
(
"QInfo:%p create count(tbname) query, res:%d rows:1"
,
pQInfo
,
count
);
}
else
{
// return only the tags|table name etc.
count
=
0
;
SSchema
tbnameSchema
=
tGetTableNameColumnSchema
();
int32_t
maxNumOfTables
=
pQuery
->
rec
.
capacity
;
int32_t
maxNumOfTables
=
(
int32_t
)
pQuery
->
rec
.
capacity
;
if
(
pQuery
->
limit
.
limit
>=
0
&&
pQuery
->
limit
.
limit
<
pQuery
->
rec
.
capacity
)
{
maxNumOfTables
=
pQuery
->
limit
.
limit
;
maxNumOfTables
=
(
int32_t
)
pQuery
->
limit
.
limit
;
}
while
(
pQInfo
->
tableIndex
<
num
&&
count
<
maxNumOfTables
)
{
...
...
src/query/src/qExtbuffer.c
浏览文件 @
afa86981
...
...
@@ -172,7 +172,7 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow
pMemBuffer
->
numOfElemsInBuffer
+=
numOfRows
;
pMemBuffer
->
numOfTotalElems
+=
numOfRows
;
}
else
{
int32_t
numOfRemainEntries
=
pMemBuffer
->
numOfElemsPerPage
-
pLast
->
item
.
num
;
int32_t
numOfRemainEntries
=
pMemBuffer
->
numOfElemsPerPage
-
(
int32_t
)
pLast
->
item
.
num
;
tColModelAppend
(
pMemBuffer
->
pColumnModel
,
&
pLast
->
item
,
data
,
0
,
numOfRemainEntries
,
numOfRows
);
pMemBuffer
->
numOfElemsInBuffer
+=
numOfRemainEntries
;
...
...
@@ -270,7 +270,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
return
ret
;
}
pMemBuffer
->
fileMeta
.
numOfElemsInFile
+=
first
->
item
.
num
;
pMemBuffer
->
fileMeta
.
numOfElemsInFile
+=
(
uint32_t
)
first
->
item
.
num
;
pMemBuffer
->
fileMeta
.
nFileSize
+=
1
;
tFilePagesItem
*
ptmp
=
first
;
...
...
@@ -322,7 +322,7 @@ void tExtMemBufferClear(tExtMemBuffer *pMemBuffer) {
}
bool
tExtMemBufferLoadData
(
tExtMemBuffer
*
pMemBuffer
,
tFilePage
*
pFilePage
,
int32_t
flushoutId
,
int32_t
pageIdx
)
{
if
(
flushoutId
<
0
||
flushoutId
>
pMemBuffer
->
fileMeta
.
flushoutData
.
nLength
)
{
if
(
flushoutId
<
0
||
flushoutId
>
(
int32_t
)
pMemBuffer
->
fileMeta
.
flushoutData
.
nLength
)
{
return
false
;
}
...
...
@@ -1011,8 +1011,8 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC
}
int32_t
removed
=
e
-
s
+
1
;
int32_t
remain
=
inputBuffer
->
num
-
removed
;
int32_t
secPart
=
inputBuffer
->
num
-
e
-
1
;
int32_t
remain
=
(
int32_t
)
inputBuffer
->
num
-
removed
;
int32_t
secPart
=
(
int32_t
)
inputBuffer
->
num
-
e
-
1
;
/* start from the second column */
for
(
int32_t
i
=
0
;
i
<
pModel
->
numOfCols
;
++
i
)
{
...
...
src/query/src/qFill.c
浏览文件 @
afa86981
...
...
@@ -185,34 +185,34 @@ static double linearInterpolationImpl(double v1, double v2, double k1, double k2
int
taosDoLinearInterpolation
(
int32_t
type
,
SPoint
*
point1
,
SPoint
*
point2
,
SPoint
*
point
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
*
(
int32_t
*
)
point
->
val
=
(
int32_t
)
linearInterpolationImpl
(
*
(
int32_t
*
)
point1
->
val
,
*
(
int32_t
*
)
point2
->
val
,
point1
->
key
,
point2
->
key
,
point
->
key
);
*
(
int32_t
*
)
point
->
val
=
(
int32_t
)
linearInterpolationImpl
(
*
(
int32_t
*
)
point1
->
val
,
*
(
int32_t
*
)
point2
->
val
,
(
double
)
point1
->
key
,
(
double
)
point2
->
key
,
(
double
)
point
->
key
);
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
*
(
float
*
)
point
->
val
=
linearInterpolationImpl
(
*
(
float
*
)
point1
->
val
,
*
(
float
*
)
point2
->
val
,
point1
->
key
,
point2
->
key
,
point
->
key
);
*
(
float
*
)
point
->
val
=
(
float
)
linearInterpolationImpl
(
*
(
float
*
)
point1
->
val
,
*
(
float
*
)
point2
->
val
,
(
double
)
point1
->
key
,
(
double
)
point2
->
key
,
(
double
)
point
->
key
);
break
;
};
case
TSDB_DATA_TYPE_DOUBLE
:
{
*
(
double
*
)
point
->
val
=
linearInterpolationImpl
(
*
(
double
*
)
point1
->
val
,
*
(
double
*
)
point2
->
val
,
point1
->
key
,
point2
->
key
,
point
->
key
);
linearInterpolationImpl
(
*
(
double
*
)
point1
->
val
,
*
(
double
*
)
point2
->
val
,
(
double
)
point1
->
key
,
(
double
)
point2
->
key
,
(
double
)
point
->
key
);
break
;
};
case
TSDB_DATA_TYPE_TIMESTAMP
:
case
TSDB_DATA_TYPE_BIGINT
:
{
*
(
int64_t
*
)
point
->
val
=
(
int64_t
)
linearInterpolationImpl
(
*
(
int64_t
*
)
point1
->
val
,
*
(
int64_t
*
)
point2
->
val
,
point1
->
key
,
point2
->
key
,
point
->
key
);
*
(
int64_t
*
)
point
->
val
=
(
int64_t
)
linearInterpolationImpl
((
double
)(
*
(
int64_t
*
)
point1
->
val
),
(
double
)(
*
(
int64_t
*
)
point2
->
val
),
(
double
)
point1
->
key
,
(
double
)
point2
->
key
,
(
double
)
point
->
key
);
break
;
};
case
TSDB_DATA_TYPE_SMALLINT
:
{
*
(
int16_t
*
)
point
->
val
=
(
int16_t
)
linearInterpolationImpl
(
*
(
int16_t
*
)
point1
->
val
,
*
(
int16_t
*
)
point2
->
val
,
point1
->
key
,
point2
->
key
,
point
->
key
);
*
(
int16_t
*
)
point
->
val
=
(
int16_t
)
linearInterpolationImpl
(
*
(
int16_t
*
)
point1
->
val
,
*
(
int16_t
*
)
point2
->
val
,
(
double
)
point1
->
key
,
(
double
)
point2
->
key
,
(
double
)
point
->
key
);
break
;
};
case
TSDB_DATA_TYPE_TINYINT
:
{
*
(
int8_t
*
)
point
->
val
=
(
int8_t
)
linearInterpolationImpl
(
*
(
int8_t
*
)
point1
->
val
,
*
(
int8_t
*
)
point2
->
val
,
point1
->
key
,
point2
->
key
,
point
->
key
);
linearInterpolationImpl
(
*
(
int8_t
*
)
point1
->
val
,
*
(
int8_t
*
)
point2
->
val
,
(
double
)
point1
->
key
,
(
double
)
point2
->
key
,
(
double
)
point
->
key
);
break
;
};
default:
{
...
...
@@ -467,7 +467,7 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
int64_t
taosGenerateDataBlock
(
SFillInfo
*
pFillInfo
,
tFilePage
**
output
,
int32_t
capacity
)
{
int32_t
remain
=
taosNumOfRemainRows
(
pFillInfo
);
// todo use iterator?
int32_t
rows
=
getFilledNumOfRes
(
pFillInfo
,
pFillInfo
->
endKey
,
capacity
);
int32_t
rows
=
(
int32_t
)
getFilledNumOfRes
(
pFillInfo
,
pFillInfo
->
endKey
,
capacity
);
int32_t
numOfRes
=
generateDataBlockImpl
(
pFillInfo
,
output
,
remain
,
rows
,
pFillInfo
->
pData
);
assert
(
numOfRes
==
rows
);
...
...
src/query/src/qHistogram.c
浏览文件 @
afa86981
...
...
@@ -482,10 +482,10 @@ int64_t tHistogramSum(SHistogramInfo* pHisto, double v) {
}
}
double
m1
=
pHisto
->
elems
[
slotIdx
].
num
;
double
m1
=
(
double
)
pHisto
->
elems
[
slotIdx
].
num
;
double
v1
=
pHisto
->
elems
[
slotIdx
].
val
;
double
m2
=
pHisto
->
elems
[
slotIdx
+
1
].
num
;
double
m2
=
(
double
)
pHisto
->
elems
[
slotIdx
+
1
].
num
;
double
v2
=
pHisto
->
elems
[
slotIdx
+
1
].
val
;
double
estNum
=
m1
+
(
m2
-
m1
)
*
(
v
-
v1
)
/
(
v2
-
v1
);
...
...
@@ -538,7 +538,7 @@ double* tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num) {
pVal
[
i
]
=
pHisto
->
elems
[
j
].
val
;
}
double
start
=
pHisto
->
elems
[
j
].
num
;
double
start
=
(
double
)
pHisto
->
elems
[
j
].
num
;
double
range
=
pHisto
->
elems
[
j
+
1
].
num
-
start
;
if
(
range
==
0
)
{
...
...
src/query/src/qParserImpl.c
浏览文件 @
afa86981
...
...
@@ -158,7 +158,7 @@ tSQLExpr *tSQLExprCreateFunction(tSQLExprList *pList, SSQLToken *pFuncToken, SSQ
pExpr
->
nSQLOptr
=
optType
;
pExpr
->
pParam
=
pList
;
int32_t
len
=
(
endToken
->
z
+
endToken
->
n
)
-
pFuncToken
->
z
;
int32_t
len
=
(
int32_t
)((
endToken
->
z
+
endToken
->
n
)
-
pFuncToken
->
z
)
;
pExpr
->
operand
.
z
=
pFuncToken
->
z
;
pExpr
->
operand
.
n
=
len
;
// raw field name
...
...
@@ -468,7 +468,7 @@ void tSQLSetColumnInfo(TAOS_FIELD *pField, SSQLToken *pName, TAOS_FIELD *pType)
int32_t
maxLen
=
sizeof
(
pField
->
name
)
/
sizeof
(
pField
->
name
[
0
]);
// truncate the column name
if
(
pName
->
n
>=
maxLen
)
{
if
(
(
int32_t
)
pName
->
n
>=
maxLen
)
{
pName
->
n
=
maxLen
-
1
;
}
...
...
@@ -524,7 +524,7 @@ SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection,
SQuerySQL
*
pQuery
=
calloc
(
1
,
sizeof
(
SQuerySQL
));
pQuery
->
selectToken
=
*
pSelectToken
;
pQuery
->
selectToken
.
n
=
strlen
(
pQuery
->
selectToken
.
z
);
// all later sql string are belonged to the stream sql
pQuery
->
selectToken
.
n
=
(
uint32_t
)
strlen
(
pQuery
->
selectToken
.
z
);
// all later sql string are belonged to the stream sql
pQuery
->
pSelection
=
pSelection
;
pQuery
->
from
=
pFrom
;
...
...
src/query/src/qPercentile.c
浏览文件 @
afa86981
...
...
@@ -55,18 +55,18 @@ static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx,
// load data in disk to memory
tFilePage
*
pPage
=
(
tFilePage
*
)
calloc
(
1
,
pMemBuffer
->
pageSize
);
for
(
int32_t
i
=
0
;
i
<
pMemBuffer
->
fileMeta
.
flushoutData
.
nLength
;
++
i
)
{
for
(
u
int32_t
i
=
0
;
i
<
pMemBuffer
->
fileMeta
.
flushoutData
.
nLength
;
++
i
)
{
tFlushoutInfo
*
pFlushInfo
=
&
pMemBuffer
->
fileMeta
.
flushoutData
.
pFlushoutInfo
[
i
];
int32_t
ret
=
fseek
(
pMemBuffer
->
file
,
pFlushInfo
->
startPageId
*
pMemBuffer
->
pageSize
,
SEEK_SET
);
UNUSED
(
ret
);
for
(
uint32_t
j
=
0
;
j
<
pFlushInfo
->
numOfPages
;
++
j
)
{
ret
=
fread
(
pPage
,
pMemBuffer
->
pageSize
,
1
,
pMemBuffer
->
file
);
ret
=
(
int32_t
)
fread
(
pPage
,
pMemBuffer
->
pageSize
,
1
,
pMemBuffer
->
file
);
UNUSED
(
ret
);
assert
(
pPage
->
num
>
0
);
tColModelAppend
(
pDesc
->
pColumnModel
,
buffer
,
pPage
->
data
,
0
,
pPage
->
num
,
pPage
->
num
);
tColModelAppend
(
pDesc
->
pColumnModel
,
buffer
,
pPage
->
data
,
0
,
(
int32_t
)
pPage
->
num
,
(
int32_t
)
pPage
->
num
);
printf
(
"id: %d count: %"
PRIu64
"
\n
"
,
j
,
buffer
->
num
);
}
}
...
...
@@ -78,12 +78,12 @@ static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx,
// load data in pMemBuffer to buffer
tFilePagesItem
*
pListItem
=
pMemBuffer
->
pHead
;
while
(
pListItem
!=
NULL
)
{
tColModelAppend
(
pDesc
->
pColumnModel
,
buffer
,
pListItem
->
item
.
data
,
0
,
pListItem
->
item
.
num
,
pListItem
->
item
.
num
);
tColModelAppend
(
pDesc
->
pColumnModel
,
buffer
,
pListItem
->
item
.
data
,
0
,
(
int32_t
)
pListItem
->
item
.
num
,
(
int32_t
)
pListItem
->
item
.
num
);
pListItem
=
pListItem
->
pNext
;
}
tColDataQSort
(
pDesc
,
buffer
->
num
,
0
,
buffer
->
num
-
1
,
buffer
->
data
,
TSDB_ORDER_ASC
);
tColDataQSort
(
pDesc
,
(
int32_t
)
buffer
->
num
,
0
,
(
int32_t
)
buffer
->
num
-
1
,
buffer
->
data
,
TSDB_ORDER_ASC
);
pDesc
->
pColumnModel
->
capacity
=
oldCapacity
;
// restore value
return
buffer
;
...
...
@@ -883,7 +883,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
if
(
sz
!=
pMemBuffer
->
pageSize
)
{
uError
(
"MemBucket:%p, read tmp file %s failed"
,
pMemBucket
,
pMemBuffer
->
path
);
}
else
{
tMemBucketPut
(
pMemBucket
,
pPage
->
data
,
pPage
->
num
);
tMemBucketPut
(
pMemBucket
,
pPage
->
data
,
(
int32_t
)
pPage
->
num
);
}
}
...
...
src/query/src/qResultbuf.c
浏览文件 @
afa86981
...
...
@@ -6,7 +6,7 @@
#include "queryLog.h"
#include "taoserror.h"
#define GET_DATA_PAYLOAD(_p) ((
_p)->pData + POINTER_BYTES
)
#define GET_DATA_PAYLOAD(_p) ((
tFilePage*)(((char*)(_p)->pData) + POINTER_BYTES)
)
int32_t
createDiskbasedResultBuffer
(
SDiskbasedResultBuf
**
pResultBuf
,
int32_t
rowSize
,
int32_t
pagesize
,
int32_t
inMemBufSize
,
const
void
*
handle
)
{
...
...
@@ -95,8 +95,8 @@ static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t si
SFreeListItem
*
pi
=
taosArrayGet
(
pResultBuf
->
pFree
,
i
);
if
(
pi
->
len
>=
size
)
{
offset
=
pi
->
offset
;
pi
->
offset
+=
size
;
pi
->
len
-=
size
;
pi
->
offset
+=
(
int32_t
)
size
;
pi
->
len
-=
(
int32_t
)
size
;
return
offset
;
}
...
...
@@ -172,7 +172,7 @@ static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
// load file block data in disk
static
char
*
loadPageFromDisk
(
SDiskbasedResultBuf
*
pResultBuf
,
SPageInfo
*
pg
)
{
int32_t
ret
=
fseek
(
pResultBuf
->
file
,
pg
->
info
.
offset
,
SEEK_SET
);
ret
=
fread
(
GET_DATA_PAYLOAD
(
pg
),
1
,
pg
->
info
.
length
,
pResultBuf
->
file
);
ret
=
(
int32_t
)
fread
(
GET_DATA_PAYLOAD
(
pg
),
1
,
pg
->
info
.
length
,
pResultBuf
->
file
);
if
(
ret
!=
pg
->
info
.
length
)
{
terrno
=
errno
;
return
NULL
;
...
...
@@ -183,7 +183,7 @@ static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
int32_t
fullSize
=
0
;
doDecompressData
(
GET_DATA_PAYLOAD
(
pg
),
pg
->
info
.
length
,
&
fullSize
,
pResultBuf
);
return
GET_DATA_PAYLOAD
(
pg
);
return
(
char
*
)
GET_DATA_PAYLOAD
(
pg
);
}
#define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages)
...
...
@@ -246,7 +246,7 @@ static char* evicOneDataPage(SDiskbasedResultBuf* pResultBuf) {
// all pages are referenced by user, try to allocate new space
if
(
pn
==
NULL
)
{
int32_t
prev
=
pResultBuf
->
inMemPages
;
pResultBuf
->
inMemPages
=
pResultBuf
->
inMemPages
*
1
.
5
;
pResultBuf
->
inMemPages
=
(
int32_t
)(
pResultBuf
->
inMemPages
*
1
.
5
f
)
;
qWarn
(
"%p in memory buf page not sufficient, expand from %d to %d, page size:%d"
,
pResultBuf
,
prev
,
pResultBuf
->
inMemPages
,
pResultBuf
->
pageSize
);
...
...
src/query/src/qTokenizer.c
浏览文件 @
afa86981
...
...
@@ -254,12 +254,12 @@ static const char isIdChar[] = {
static
void
*
KeywordHashTable
=
NULL
;
static
void
doInitKeywordsTable
()
{
static
void
doInitKeywordsTable
(
void
)
{
int
numOfEntries
=
tListLen
(
keywordTable
);
KeywordHashTable
=
taosHashInit
(
numOfEntries
,
MurmurHash3_32
,
false
);
for
(
int32_t
i
=
0
;
i
<
numOfEntries
;
i
++
)
{
keywordTable
[
i
].
len
=
strlen
(
keywordTable
[
i
].
name
);
keywordTable
[
i
].
len
=
(
uint8_t
)
strlen
(
keywordTable
[
i
].
name
);
void
*
ptr
=
&
keywordTable
[
i
];
taosHashPut
(
KeywordHashTable
,
keywordTable
[
i
].
name
,
keywordTable
[
i
].
len
,
(
void
*
)
&
ptr
,
POINTER_BYTES
);
}
...
...
src/query/src/qTsbuf.c
浏览文件 @
afa86981
...
...
@@ -75,7 +75,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
return
NULL
;
}
if
(
header
.
numOfVnode
>
pTSBuf
->
numOfAlloc
)
{
if
(
(
int32_t
)
header
.
numOfVnode
>
pTSBuf
->
numOfAlloc
)
{
pTSBuf
->
numOfAlloc
=
header
.
numOfVnode
;
STSVnodeBlockInfoEx
*
tmp
=
realloc
(
pTSBuf
->
pData
,
sizeof
(
STSVnodeBlockInfoEx
)
*
pTSBuf
->
numOfAlloc
);
if
(
tmp
==
NULL
)
{
...
...
@@ -171,7 +171,7 @@ static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) {
static
STSVnodeBlockInfoEx
*
addOneVnodeInfo
(
STSBuf
*
pTSBuf
,
int32_t
vnodeId
)
{
if
(
pTSBuf
->
numOfAlloc
<=
pTSBuf
->
numOfVnodes
)
{
uint32_t
newSize
=
(
uint32_t
)(
pTSBuf
->
numOfAlloc
*
1
.
5
);
assert
(
newSize
>
pTSBuf
->
numOfAlloc
);
assert
(
(
int32_t
)
newSize
>
pTSBuf
->
numOfAlloc
);
STSVnodeBlockInfoEx
*
tmp
=
(
STSVnodeBlockInfoEx
*
)
realloc
(
pTSBuf
->
pData
,
sizeof
(
STSVnodeBlockInfoEx
)
*
newSize
);
if
(
tmp
==
NULL
)
{
...
...
@@ -288,7 +288,7 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
* set the right position for the reversed traverse, the reversed traverse is started from
* the end of each comp data block
*/
int32_t
ret
=
fseek
(
pTSBuf
->
f
,
-
sizeof
(
pBlock
->
padding
),
SEEK_CUR
);
int32_t
ret
=
fseek
(
pTSBuf
->
f
,
-
(
int32_t
)(
sizeof
(
pBlock
->
padding
)
),
SEEK_CUR
);
size_t
sz
=
fread
(
&
pBlock
->
padding
,
sizeof
(
pBlock
->
padding
),
1
,
pTSBuf
->
f
);
UNUSED
(
sz
);
...
...
@@ -474,7 +474,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo
offset
=
pBlockInfo
->
offset
+
pBlockInfo
->
compLen
;
}
if
(
fseek
(
pTSBuf
->
f
,
offset
,
SEEK_SET
)
!=
0
)
{
if
(
fseek
(
pTSBuf
->
f
,
(
int32_t
)
offset
,
SEEK_SET
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -524,7 +524,7 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
* may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
*/
if
(
s
>
pTSBuf
->
tsData
.
allocSize
)
{
expandBuffer
(
&
pTSBuf
->
tsData
,
s
);
expandBuffer
(
&
pTSBuf
->
tsData
,
(
int32_t
)
s
);
}
pTSBuf
->
tsData
.
len
=
...
...
@@ -737,7 +737,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
assert
(
r
==
0
);
int64_t
offset
=
getDataStartOffset
();
int32_t
size
=
pSrcBuf
->
fileSize
-
offset
;
int32_t
size
=
(
int32_t
)
pSrcBuf
->
fileSize
-
(
int32_t
)
offset
;
ssize_t
rc
=
taosFSendFile
(
pDestBuf
->
f
,
pSrcBuf
->
f
,
&
offset
,
size
);
...
...
@@ -896,7 +896,7 @@ static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockIn
return
-
1
;
}
if
(
fseek
(
pTSBuf
->
f
,
offset
,
SEEK_SET
)
!=
0
)
{
if
(
fseek
(
pTSBuf
->
f
,
(
int32_t
)
offset
,
SEEK_SET
)
!=
0
)
{
return
-
1
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录