Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
c307a201
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c307a201
编写于
6月 01, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
6月 01, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2089 from taosdata/feature/query
Feature/query
上级
fead65bf
a75dca95
变更
43
展开全部
隐藏空白更改
内联
并排
Showing
43 changed file
with
1138 addition
and
1277 deletion
+1138
-1277
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+1
-1
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+2
-2
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+2
-1
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+58
-107
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+57
-64
src/client/src/tscSecondaryMerge.c
src/client/src/tscSecondaryMerge.c
+14
-8
src/client/src/tscServer.c
src/client/src/tscServer.c
+3
-14
src/client/src/tscSql.c
src/client/src/tscSql.c
+14
-28
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+3
-1
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+11
-11
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+1
-0
src/common/src/ttypes.c
src/common/src/ttypes.c
+34
-14
src/inc/taosdef.h
src/inc/taosdef.h
+3
-3
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-8
src/inc/tsdb.h
src/inc/tsdb.h
+4
-0
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+4
-4
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+89
-79
src/query/inc/qextbuffer.h
src/query/inc/qextbuffer.h
+1
-2
src/query/inc/qfill.h
src/query/inc/qfill.h
+4
-3
src/query/inc/qresultBuf.h
src/query/inc/qresultBuf.h
+2
-0
src/query/inc/tsqlfunction.h
src/query/inc/tsqlfunction.h
+20
-25
src/query/inc/tvariant.h
src/query/inc/tvariant.h
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+280
-559
src/query/src/qFilterFunc.c
src/query/src/qFilterFunc.c
+1
-1
src/query/src/qUtil.c
src/query/src/qUtil.c
+2
-3
src/query/src/qfill.c
src/query/src/qfill.c
+20
-7
src/query/src/qresultBuf.c
src/query/src/qresultBuf.c
+8
-10
src/query/src/tvariant.c
src/query/src/tvariant.c
+46
-13
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+5
-1
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+15
-24
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+239
-87
src/util/inc/hash.h
src/util/inc/hash.h
+5
-10
src/util/src/hash.c
src/util/src/hash.c
+71
-99
src/util/src/tcompare.c
src/util/src/tcompare.c
+1
-1
src/util/tests/hashTest.cpp
src/util/tests/hashTest.cpp
+4
-4
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+0
-2
tests/examples/c/demo.c
tests/examples/c/demo.c
+42
-11
tests/script/general/import/commit.sim
tests/script/general/import/commit.sim
+2
-1
tests/script/general/parser/create_db.sim
tests/script/general/parser/create_db.sim
+11
-16
tests/script/general/parser/interp_test.sim
tests/script/general/parser/interp_test.sim
+2
-0
tests/script/general/parser/limit1_stb.sim
tests/script/general/parser/limit1_stb.sim
+1
-0
tests/script/general/parser/testSuite.sim
tests/script/general/parser/testSuite.sim
+40
-42
tests/script/general/parser/where.sim
tests/script/general/parser/where.sim
+14
-10
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
c307a201
...
...
@@ -175,7 +175,7 @@ SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIn
SSqlExpr
*
tscSqlExprUpdate
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
int16_t
srcColumnIndex
,
int16_t
type
,
int16_t
size
);
int32
_t
tscSqlExprNumOfExprs
(
SQueryInfo
*
pQueryInfo
);
size
_t
tscSqlExprNumOfExprs
(
SQueryInfo
*
pQueryInfo
);
SSqlExpr
*
tscSqlExprGet
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
void
tscSqlExprCopy
(
SArray
*
dst
,
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
);
...
...
src/client/inc/tsclient.h
浏览文件 @
c307a201
...
...
@@ -84,7 +84,7 @@ typedef struct SSqlExpr {
int16_t
functionId
;
// function id in aAgg array
int16_t
resType
;
// return value type
int16_t
resBytes
;
// length of return value
int
16
_t
interBytes
;
// inter result buffer size
int
32
_t
interBytes
;
// inter result buffer size
int16_t
numOfParams
;
// argument value of each function
tVariant
param
[
3
];
// parameters are not more than 3
int32_t
offset
;
// sub result column value of arithmetic expression.
...
...
@@ -320,7 +320,7 @@ typedef struct SSqlObj {
tsem_t
rspSem
;
SSqlCmd
cmd
;
SSqlRes
res
;
uint
8_t
numOfSubs
;
uint
16_t
numOfSubs
;
struct
SSqlObj
**
pSubs
;
struct
SSqlObj
*
prev
,
*
next
;
}
SSqlObj
;
...
...
src/client/src/tscAsync.c
浏览文件 @
c307a201
...
...
@@ -57,6 +57,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
}
pSql
->
sqlstr
=
realloc
(
pSql
->
sqlstr
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"%p failed to malloc sql string buffer"
,
pSql
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
...
...
@@ -165,7 +166,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
((
pRes
->
qhandle
==
0
||
numOfRows
!=
0
)
&&
pCmd
->
command
<
TSDB_SQL_LOCAL
)
{
if
(
pRes
->
qhandle
==
0
)
{
if
(
pRes
->
qhandle
==
0
&&
numOfRows
!=
0
)
{
tscError
(
"qhandle is NULL"
);
}
else
{
pRes
->
code
=
numOfRows
;
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
c307a201
...
...
@@ -153,7 +153,7 @@ typedef struct SRateInfo {
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
int16_t
*
type
,
int16_t
*
bytes
,
int
16
_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
)
{
int16_t
*
bytes
,
int
32
_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
)
{
if
(
!
isValidDataType
(
dataType
,
dataBytes
))
{
tscError
(
"Illegal data type %d or data type length %d"
,
dataType
,
dataBytes
);
return
TSDB_CODE_INVALID_SQL
;
...
...
@@ -478,7 +478,7 @@ int32_t count_load_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
BLK_DATA_NO_NEEDED
;
}
else
{
return
BLK_DATA_
FILED
S_NEEDED
;
return
BLK_DATA_
STATI
S_NEEDED
;
}
}
...
...
@@ -690,7 +690,7 @@ static void sum_func_second_merge(SQLFunctionCtx *pCtx) {
}
static
int32_t
precal_req_load_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
return
BLK_DATA_
FILED
S_NEEDED
;
return
BLK_DATA_
STATI
S_NEEDED
;
}
static
int32_t
data_req_load_info
(
SQLFunctionCtx
*
pCtx
,
TSKEY
start
,
TSKEY
end
,
int32_t
colId
)
{
...
...
@@ -1848,13 +1848,14 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
pResInfo
->
hasResult
=
DATA_SET_FLAG
;
SLastrowInfo
*
pInfo
=
(
SLastrowInfo
*
)
pResInfo
->
interResultBuf
;
pInfo
->
ts
=
pCtx
->
param
[
0
].
i64Key
;
pInfo
->
ts
=
pCtx
->
ptsList
[
0
];
pInfo
->
hasResult
=
DATA_SET_FLAG
;
// set the result to final result buffer
if
(
pResInfo
->
superTableQ
)
{
SLastrowInfo
*
pInfo1
=
(
SLastrowInfo
*
)(
pCtx
->
aOutputBuf
+
pCtx
->
inputBytes
);
pInfo1
->
ts
=
pCtx
->
p
aram
[
0
].
i64Key
;
pInfo1
->
ts
=
pCtx
->
p
tsList
[
0
]
;
pInfo1
->
hasResult
=
DATA_SET_FLAG
;
DO_UPDATE_TAG_COLUMNS
(
pCtx
,
pInfo1
->
ts
);
...
...
@@ -1904,12 +1905,12 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
memcpy
(
dst
->
pTags
,
pTags
,
(
size_t
)
pTagInfo
->
tagsLen
);
}
else
{
// the tags are dumped from the ctx tag fields
for
(
int32_t
i
=
0
;
i
<
pTagInfo
->
numOfTagCols
;
++
i
)
{
SQLFunctionCtx
*
__
ctx
=
pTagInfo
->
pTagCtxList
[
i
];
if
(
__
ctx
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
__
ctx
->
tag
=
(
tVariant
)
{.
nType
=
TSDB_DATA_TYPE_BIGINT
,
.
i64Key
=
tsKey
};
SQLFunctionCtx
*
ctx
=
pTagInfo
->
pTagCtxList
[
i
];
if
(
ctx
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
ctx
->
tag
=
(
tVariant
)
{.
nType
=
TSDB_DATA_TYPE_BIGINT
,
.
i64Key
=
tsKey
};
}
tVariantDump
(
&
pTagInfo
->
pTagCtxList
[
i
]
->
tag
,
dst
->
pTags
+
size
,
pTagInfo
->
pTagCtxList
[
i
]
->
tag
.
nTyp
e
);
tVariantDump
(
&
ctx
->
tag
,
dst
->
pTags
+
size
,
ctx
->
tag
.
nType
,
tru
e
);
size
+=
pTagInfo
->
pTagCtxList
[
i
]
->
outputBytes
;
}
}
...
...
@@ -2226,7 +2227,6 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
static
void
buildTopBotStruct
(
STopBotInfo
*
pTopBotInfo
,
SQLFunctionCtx
*
pCtx
)
{
char
*
tmp
=
(
char
*
)
pTopBotInfo
+
sizeof
(
STopBotInfo
);
pTopBotInfo
->
res
=
(
tValuePair
**
)
tmp
;
tmp
+=
POINTER_BYTES
*
pCtx
->
param
[
0
].
i64Key
;
size_t
size
=
sizeof
(
tValuePair
)
+
pCtx
->
tagInfo
.
tagsLen
;
...
...
@@ -2981,14 +2981,7 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
assert
(
pCtx
->
inputBytes
==
pCtx
->
outputBytes
);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
char
*
output
=
pCtx
->
aOutputBuf
;
if
(
pCtx
->
tag
.
nType
==
TSDB_DATA_TYPE_BINARY
||
pCtx
->
tag
.
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
varDataSetLen
(
output
,
pCtx
->
tag
.
nLen
);
tVariantDump
(
&
pCtx
->
tag
,
varDataVal
(
output
),
pCtx
->
outputType
);
}
else
{
tVariantDump
(
&
pCtx
->
tag
,
output
,
pCtx
->
outputType
);
}
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
aOutputBuf
,
pCtx
->
outputType
,
true
);
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
;
}
...
...
@@ -2997,13 +2990,7 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
static
void
tag_project_function_f
(
SQLFunctionCtx
*
pCtx
,
int32_t
index
)
{
INC_INIT_VAL
(
pCtx
,
1
);
char
*
output
=
pCtx
->
aOutputBuf
;
if
(
pCtx
->
tag
.
nType
==
TSDB_DATA_TYPE_BINARY
||
pCtx
->
tag
.
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
*
(
int16_t
*
)
output
=
pCtx
->
tag
.
nLen
;
output
+=
VARSTR_HEADER_SIZE
;
}
tVariantDump
(
&
pCtx
->
tag
,
output
,
pCtx
->
tag
.
nType
);
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
aOutputBuf
,
pCtx
->
tag
.
nType
,
true
);
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
;
}
...
...
@@ -3016,30 +3003,12 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
*/
static
void
tag_function
(
SQLFunctionCtx
*
pCtx
)
{
SET_VAL
(
pCtx
,
1
,
1
);
char
*
output
=
pCtx
->
aOutputBuf
;
// todo refactor to dump length presented string(var string)
if
(
pCtx
->
tag
.
nType
==
TSDB_DATA_TYPE_BINARY
||
pCtx
->
tag
.
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
*
(
int16_t
*
)
output
=
pCtx
->
tag
.
nLen
;
output
+=
VARSTR_HEADER_SIZE
;
}
tVariantDump
(
&
pCtx
->
tag
,
output
,
pCtx
->
tag
.
nType
);
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
aOutputBuf
,
pCtx
->
tag
.
nType
,
true
);
}
static
void
tag_function_f
(
SQLFunctionCtx
*
pCtx
,
int32_t
index
)
{
SET_VAL
(
pCtx
,
1
,
1
);
char
*
output
=
pCtx
->
aOutputBuf
;
// todo refactor to dump length presented string(var string)
if
(
pCtx
->
tag
.
nType
==
TSDB_DATA_TYPE_BINARY
||
pCtx
->
tag
.
nType
==
TSDB_DATA_TYPE_NCHAR
)
{
*
(
int16_t
*
)
output
=
pCtx
->
tag
.
nLen
;
output
+=
VARSTR_HEADER_SIZE
;
}
tVariantDump
(
&
pCtx
->
tag
,
output
,
pCtx
->
tag
.
nType
);
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
aOutputBuf
,
pCtx
->
tag
.
nType
,
true
);
}
static
void
copy_function
(
SQLFunctionCtx
*
pCtx
)
{
...
...
@@ -3853,15 +3822,15 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
}
/**
* param[1]: default value/previous value of specified timestamp
* param[2]: next value of specified timestamp
* param[3]: denotes if the result is a precious result or interpolation results
*
* @param pCtx
*/
static
void
interp_function
(
SQLFunctionCtx
*
pCtx
)
{
// at this point, the value is existed, return directly
if
(
pCtx
->
param
[
3
].
i64Key
==
1
)
{
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SInterpInfoDetail
*
pInfo
=
pResInfo
->
interResultBuf
;
if
(
pCtx
->
size
==
1
)
{
char
*
pData
=
GET_INPUT_CHAR
(
pCtx
);
assignVal
(
pCtx
->
aOutputBuf
,
pData
,
pCtx
->
inputBytes
,
pCtx
->
inputType
);
}
else
{
...
...
@@ -3869,76 +3838,65 @@ static void interp_function(SQLFunctionCtx *pCtx) {
* use interpolation to generate the result.
* Note: the result of primary timestamp column uses the timestamp specified by user in the query sql
*/
assert
(
pCtx
->
param
[
3
].
i64Key
==
2
);
SInterpInfo
interpInfo
=
*
(
SInterpInfo
*
)
pCtx
->
aOutputBuf
;
SInterpInfoDetail
*
pInfoDetail
=
interpInfo
.
pInterpDetail
;
assert
(
pCtx
->
size
==
2
);
if
(
pInfo
->
type
==
TSDB_FILL_NONE
)
{
// set no output result
return
;
}
/* set no output result */
if
(
pInfoDetail
->
type
==
TSDB_FILL_NONE
)
{
pCtx
->
param
[
3
].
i64Key
=
0
;
}
else
if
(
pInfoDetail
->
primaryCol
==
1
)
{
*
(
TSKEY
*
)
pCtx
->
aOutputBuf
=
pInfoDetail
->
ts
;
if
(
pInfo
->
primaryCol
==
1
)
{
*
(
TSKEY
*
)
pCtx
->
aOutputBuf
=
pInfo
->
ts
;
}
else
{
if
(
pInfo
Detail
->
type
==
TSDB_FILL_NULL
)
{
if
(
pInfo
->
type
==
TSDB_FILL_NULL
)
{
if
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_BINARY
||
pCtx
->
outputType
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
(
pCtx
->
aOutputBuf
,
pCtx
->
outputType
);
}
else
{
setNull
(
pCtx
->
aOutputBuf
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
}
else
if
(
pInfoDetail
->
type
==
TSDB_FILL_SET_VALUE
)
{
tVariantDump
(
&
pCtx
->
param
[
1
],
pCtx
->
aOutputBuf
,
pCtx
->
inputType
);
}
else
if
(
pInfoDetail
->
type
==
TSDB_FILL_PREV
)
{
char
*
data
=
pCtx
->
param
[
1
].
pz
;
char
*
pVal
=
data
+
TSDB_KEYSIZE
;
if
(
pCtx
->
outputType
==
TSDB_DATA_TYPE_FLOAT
)
{
float
v
=
GET_DOUBLE_VAL
(
pVal
);
assignVal
(
pCtx
->
aOutputBuf
,
(
const
char
*
)
&
v
,
pCtx
->
outputBytes
,
pCtx
->
outputType
);
}
else
{
assignVal
(
pCtx
->
aOutputBuf
,
pVal
,
pCtx
->
outputBytes
,
pCtx
->
outputType
);
}
}
else
if
(
pInfoDetail
->
type
==
TSDB_FILL_LINEAR
)
{
char
*
data1
=
pCtx
->
param
[
1
].
pz
;
char
*
data2
=
pCtx
->
param
[
2
].
pz
;
char
*
pVal1
=
data1
+
TSDB_KEYSIZE
;
char
*
pVal2
=
data2
+
TSDB_KEYSIZE
;
SPoint
point1
=
{.
key
=
*
(
TSKEY
*
)
data1
,
.
val
=
&
pCtx
->
param
[
1
].
i64Key
};
SPoint
point2
=
{.
key
=
*
(
TSKEY
*
)
data2
,
.
val
=
&
pCtx
->
param
[
2
].
i64Key
};
SPoint
point
=
{.
key
=
pInfoDetail
->
ts
,
.
val
=
pCtx
->
aOutputBuf
};
SET_VAL
(
pCtx
,
pCtx
->
size
,
1
);
}
else
if
(
pInfo
->
type
==
TSDB_FILL_SET_VALUE
)
{
tVariantDump
(
&
pCtx
->
param
[
1
],
pCtx
->
aOutputBuf
,
pCtx
->
inputType
,
true
);
}
else
if
(
pInfo
->
type
==
TSDB_FILL_PREV
)
{
char
*
data
=
GET_INPUT_CHAR_INDEX
(
pCtx
,
0
);
assignVal
(
pCtx
->
aOutputBuf
,
data
,
pCtx
->
outputBytes
,
pCtx
->
outputType
);
SET_VAL
(
pCtx
,
pCtx
->
size
,
1
);
}
else
if
(
pInfo
->
type
==
TSDB_FILL_LINEAR
)
{
char
*
data1
=
GET_INPUT_CHAR_INDEX
(
pCtx
,
0
);
char
*
data2
=
GET_INPUT_CHAR_INDEX
(
pCtx
,
1
);
TSKEY
key1
=
pCtx
->
ptsList
[
0
];
TSKEY
key2
=
pCtx
->
ptsList
[
1
];
SPoint
point1
=
{.
key
=
key1
,
.
val
=
data1
};
SPoint
point2
=
{.
key
=
key2
,
.
val
=
data2
};
SPoint
point
=
{.
key
=
pInfo
->
ts
,
.
val
=
pCtx
->
aOutputBuf
};
int32_t
srcType
=
pCtx
->
inputType
;
if
((
srcType
>=
TSDB_DATA_TYPE_TINYINT
&&
srcType
<=
TSDB_DATA_TYPE_BIGINT
)
||
srcType
==
TSDB_DATA_TYPE_TIMESTAMP
||
srcType
==
TSDB_DATA_TYPE_DOUBLE
)
{
point1
.
val
=
pVal1
;
point2
.
val
=
pVal2
;
if
(
isNull
(
pVal1
,
srcType
)
||
isNull
(
pVal2
,
srcType
))
{
point1
.
val
=
data1
;
point2
.
val
=
data2
;
if
(
isNull
(
data1
,
srcType
)
||
isNull
(
data2
,
srcType
))
{
setNull
(
pCtx
->
aOutputBuf
,
srcType
,
pCtx
->
inputBytes
);
}
else
{
taosDoLinearInterpolation
(
pCtx
->
outputType
,
&
point1
,
&
point2
,
&
point
);
}
}
else
if
(
srcType
==
TSDB_DATA_TYPE_FLOAT
)
{
float
v1
=
GET_DOUBLE_VAL
(
pVal1
);
float
v2
=
GET_DOUBLE_VAL
(
pVal2
);
point1
.
val
=
&
v1
;
point2
.
val
=
&
v2
;
if
(
isNull
(
pVal1
,
srcType
)
||
isNull
(
pVal2
,
srcType
))
{
point1
.
val
=
data1
;
point2
.
val
=
data2
;
if
(
isNull
(
data1
,
srcType
)
||
isNull
(
data2
,
srcType
))
{
setNull
(
pCtx
->
aOutputBuf
,
srcType
,
pCtx
->
inputBytes
);
}
else
{
taosDoLinearInterpolation
(
pCtx
->
outputType
,
&
point1
,
&
point2
,
&
point
);
}
}
else
{
if
(
srcType
==
TSDB_DATA_TYPE_BINARY
||
srcType
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
(
pCtx
->
aOutputBuf
,
pCtx
->
input
Bytes
);
setVardataNull
(
pCtx
->
aOutputBuf
,
pCtx
->
input
Type
);
}
else
{
setNull
(
pCtx
->
aOutputBuf
,
srcType
,
pCtx
->
inputBytes
);
}
...
...
@@ -3946,15 +3904,8 @@ static void interp_function(SQLFunctionCtx *pCtx) {
}
}
free
(
interpInfo
.
pInterpDetail
);
}
pCtx
->
size
=
pCtx
->
param
[
3
].
i64Key
;
tVariantDestroy
(
&
pCtx
->
param
[
1
]);
tVariantDestroy
(
&
pCtx
->
param
[
2
]);
// data in the check operation are all null, not output
SET_VAL
(
pCtx
,
pCtx
->
size
,
1
);
}
...
...
@@ -4910,7 +4861,7 @@ SQLAggFuncElem aAggs[] = {{
"interp"
,
TSDB_FUNC_INTERP
,
TSDB_FUNC_INTERP
,
TSDB_FUNCSTATE_SO
|
TSDB_FUNCSTATE_OF
|
TSDB_FUNCSTATE_STABLE
|
TSDB_FUNCSTATE_NEED_TS
,
TSDB_FUNCSTATE_SO
|
TSDB_FUNCSTATE_OF
|
TSDB_FUNCSTATE_STABLE
|
TSDB_FUNCSTATE_NEED_TS
,
function_setup
,
interp_function
,
do_sum_f
,
// todo filter handle
...
...
@@ -4918,7 +4869,7 @@ SQLAggFuncElem aAggs[] = {{
doFinalizer
,
noop1
,
copy_function
,
no_data
_info
,
data_req_load
_info
,
},
{
// 28
...
...
src/client/src/tscSQLParser.c
浏览文件 @
c307a201
...
...
@@ -142,7 +142,7 @@ static int setColumnFilterInfoForTimestamp(SQueryInfo* pQueryInfo, tVariant* pVa
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg
);
}
}
else
{
if
(
tVariantDump
(
pVar
,
(
char
*
)
&
time
,
TSDB_DATA_TYPE_BIGINT
))
{
if
(
tVariantDump
(
pVar
,
(
char
*
)
&
time
,
TSDB_DATA_TYPE_BIGINT
,
true
))
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg
);
}
}
...
...
@@ -1403,7 +1403,6 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
SSchema
colSchema
=
{.
type
=
TSDB_DATA_TYPE_BINARY
,
.
bytes
=
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
};
strcpy
(
colSchema
.
name
,
TSQL_TBNAME_L
);
pQueryInfo
->
type
=
TSDB_QUERY_TYPE_STABLE_QUERY
;
tscAddSpecialColumnForSelect
(
pQueryInfo
,
startPos
,
TSDB_FUNC_TAGPRJ
,
&
index
,
&
colSchema
,
true
);
}
else
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
index
.
tableIndex
);
...
...
@@ -1595,7 +1594,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
int16_t
resultType
=
0
;
int16_t
resultSize
=
0
;
int
16
_t
intermediateResSize
=
0
;
int
32
_t
intermediateResSize
=
0
;
int16_t
functionID
=
0
;
if
(
changeFunctionID
(
optr
,
&
functionID
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1628,14 +1627,14 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
if
(
optr
==
TK_LEASTSQUARES
)
{
/* set the leastsquares parameters */
char
val
[
8
]
=
{
0
};
if
(
tVariantDump
(
&
pParamElem
[
1
].
pNode
->
val
,
val
,
TSDB_DATA_TYPE_DOUBLE
)
<
0
)
{
if
(
tVariantDump
(
&
pParamElem
[
1
].
pNode
->
val
,
val
,
TSDB_DATA_TYPE_DOUBLE
,
true
)
<
0
)
{
return
TSDB_CODE_INVALID_SQL
;
}
addExprParams
(
pExpr
,
val
,
TSDB_DATA_TYPE_DOUBLE
,
DOUBLE_BYTES
,
0
);
memset
(
val
,
0
,
tListLen
(
val
));
if
(
tVariantDump
(
&
pParamElem
[
2
].
pNode
->
val
,
val
,
TSDB_DATA_TYPE_DOUBLE
)
<
0
)
{
if
(
tVariantDump
(
&
pParamElem
[
2
].
pNode
->
val
,
val
,
TSDB_DATA_TYPE_DOUBLE
,
true
)
<
0
)
{
return
TSDB_CODE_INVALID_SQL
;
}
...
...
@@ -1795,7 +1794,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
SSqlExpr
*
pExpr
=
NULL
;
if
(
optr
==
TK_PERCENTILE
||
optr
==
TK_APERCENTILE
)
{
tVariantDump
(
pVariant
,
val
,
TSDB_DATA_TYPE_DOUBLE
);
tVariantDump
(
pVariant
,
val
,
TSDB_DATA_TYPE_DOUBLE
,
true
);
double
dp
=
GET_DOUBLE_VAL
(
val
);
if
(
dp
<
0
||
dp
>
TOP_BOTTOM_QUERY_LIMIT
)
{
...
...
@@ -1818,7 +1817,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
functionId
,
&
index
,
resultType
,
resultSize
,
resultSize
,
false
);
addExprParams
(
pExpr
,
val
,
TSDB_DATA_TYPE_DOUBLE
,
sizeof
(
double
),
0
);
}
else
{
tVariantDump
(
pVariant
,
val
,
TSDB_DATA_TYPE_BIGINT
);
tVariantDump
(
pVariant
,
val
,
TSDB_DATA_TYPE_BIGINT
,
true
);
int64_t
nTop
=
*
((
int32_t
*
)
val
);
if
(
nTop
<=
0
||
nTop
>
100
)
{
// todo use macro
...
...
@@ -1902,7 +1901,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
int16_t
bytes
=
0
;
int16_t
type
=
0
;
int
16
_t
inter
=
0
;
int
32
_t
inter
=
0
;
int32_t
ret
=
getResultDataInfo
(
s
.
type
,
s
.
bytes
,
TSDB_FUNC_TID_TAG
,
0
,
&
type
,
&
bytes
,
&
inter
,
0
,
0
);
assert
(
ret
==
TSDB_CODE_SUCCESS
);
...
...
@@ -2288,7 +2287,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
int16_t
bytes
=
0
;
int16_t
type
=
0
;
int
16_t
intermediate
Bytes
=
0
;
int
32_t
inter
Bytes
=
0
;
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
k
=
0
;
k
<
size
;
++
k
)
{
...
...
@@ -2302,13 +2301,13 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
(
functionId
>=
TSDB_FUNC_FIRST_DST
&&
functionId
<=
TSDB_FUNC_LAST_DST
)
||
(
functionId
>=
TSDB_FUNC_RATE
&&
functionId
<=
TSDB_FUNC_AVG_IRATE
))
{
if
(
getResultDataInfo
(
pSrcSchema
->
type
,
pSrcSchema
->
bytes
,
functionId
,
pExpr
->
param
[
0
].
i64Key
,
&
type
,
&
bytes
,
&
inter
mediate
Bytes
,
0
,
true
)
!=
TSDB_CODE_SUCCESS
)
{
&
interBytes
,
0
,
true
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_INVALID_SQL
;
}
tscSqlExprUpdate
(
pQueryInfo
,
k
,
functionId
,
pExpr
->
colInfo
.
colIndex
,
TSDB_DATA_TYPE_BINARY
,
bytes
);
// todo refactor
pExpr
->
interBytes
=
inter
mediate
Bytes
;
pExpr
->
interBytes
=
interBytes
;
}
}
...
...
@@ -2328,27 +2327,23 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
SSchema
*
pSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
pExpr
->
colInfo
.
colIndex
);
// if (/*(pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
// (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) ||
// pExpr->functionId == TSDB_FUNC_LAST_ROW*/) {
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
int16_t
inter
=
0
;
int32_t
functionId
=
pExpr
->
functionId
;
if
(
functionId
>=
TSDB_FUNC_TS
&&
functionId
<=
TSDB_FUNC_DIFF
)
{
continue
;
}
if
(
functionId
==
TSDB_FUNC_FIRST_DST
)
{
functionId
=
TSDB_FUNC_FIRST
;
}
else
if
(
functionId
==
TSDB_FUNC_LAST_DST
)
{
functionId
=
TSDB_FUNC_LAST
;
}
getResultDataInfo
(
pSchema
->
type
,
pSchema
->
bytes
,
functionId
,
0
,
&
pExpr
->
resType
,
&
pExpr
->
resBytes
,
&
inter
,
0
,
false
);
// }
// the final result size and type in the same as query on single table.
// so here, set the flag to be false;
int32_t
inter
=
0
;
int32_t
functionId
=
pExpr
->
functionId
;
if
(
functionId
>=
TSDB_FUNC_TS
&&
functionId
<=
TSDB_FUNC_DIFF
)
{
continue
;
}
if
(
functionId
==
TSDB_FUNC_FIRST_DST
)
{
functionId
=
TSDB_FUNC_FIRST
;
}
else
if
(
functionId
==
TSDB_FUNC_LAST_DST
)
{
functionId
=
TSDB_FUNC_LAST
;
}
getResultDataInfo
(
pSchema
->
type
,
pSchema
->
bytes
,
functionId
,
0
,
&
pExpr
->
resType
,
&
pExpr
->
resBytes
,
&
inter
,
0
,
false
);
}
}
...
...
@@ -2631,23 +2626,23 @@ static int32_t doExtractColumnFilterInfo(SQueryInfo* pQueryInfo, SColumnFilterIn
}
if
(
pExpr
->
nSQLOptr
==
TK_LE
||
pExpr
->
nSQLOptr
==
TK_LT
)
{
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
&
pColumnFilter
->
upperBndd
,
colType
);
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
&
pColumnFilter
->
upperBndd
,
colType
,
false
);
}
else
{
// TK_GT,TK_GE,TK_EQ,TK_NE are based on the pColumn->lowerBndd
if
(
colType
==
TSDB_DATA_TYPE_BINARY
)
{
pColumnFilter
->
pz
=
(
int64_t
)
calloc
(
1
,
pRight
->
val
.
nLen
+
1
);
pColumnFilter
->
len
=
pRight
->
val
.
nLen
;
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
pColumnFilter
->
pz
,
colType
);
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
pColumnFilter
->
pz
,
colType
,
false
);
}
else
if
(
colType
==
TSDB_DATA_TYPE_NCHAR
)
{
// pRight->val.nLen + 1 is larger than the actual nchar string length
pColumnFilter
->
pz
=
(
int64_t
)
calloc
(
1
,
(
pRight
->
val
.
nLen
+
1
)
*
TSDB_NCHAR_SIZE
);
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
pColumnFilter
->
pz
,
colType
);
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
pColumnFilter
->
pz
,
colType
,
false
);
size_t
len
=
wcslen
((
wchar_t
*
)
pColumnFilter
->
pz
);
pColumnFilter
->
len
=
len
*
TSDB_NCHAR_SIZE
;
}
else
{
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
&
pColumnFilter
->
lowerBndd
,
colType
);
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
&
pColumnFilter
->
lowerBndd
,
colType
,
false
);
}
}
...
...
@@ -3336,9 +3331,8 @@ static int32_t handleExprInQueryCond(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, S
*
pExpr
=
NULL
;
// remove this expression
*
type
=
TSQL_EXPR_TS
;
}
else
if
(
index
.
columnIndex
>=
tscGetNumOfColumns
(
pTableMeta
)
||
index
.
columnIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
// query on tags
// check for tag query condition
}
else
if
(
index
.
columnIndex
>=
tscGetNumOfColumns
(
pTableMeta
)
||
index
.
columnIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
// query on tags, check for tag query condition
if
(
UTIL_TABLE_IS_NORMAL_TABLE
(
pTableMetaInfo
))
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg1
);
}
...
...
@@ -3933,7 +3927,7 @@ int32_t getTimeRange(STimeWindow* win, tSQLExpr* pRight, int32_t optr, int16_t t
* failed to parse timestamp in regular formation, try next
* it may be a epoch time in string format
*/
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
&
val
,
TSDB_DATA_TYPE_BIGINT
);
tVariantDump
(
&
pRight
->
val
,
(
char
*
)
&
val
,
TSDB_DATA_TYPE_BIGINT
,
true
);
/*
* transfer it into MICROSECOND format if it is a string, since for
...
...
@@ -4070,14 +4064,13 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
continue
;
}
int32_t
ret
=
tVariantDump
(
&
pFillToken
->
a
[
j
].
pVar
,
(
char
*
)
&
pQueryInfo
->
fillVal
[
i
],
pFields
->
type
);
int32_t
ret
=
tVariantDump
(
&
pFillToken
->
a
[
j
].
pVar
,
(
char
*
)
&
pQueryInfo
->
fillVal
[
i
],
pFields
->
type
,
true
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg
);
}
}
if
((
pFillToken
->
nExpr
<
size
)
||
((
pFillToken
->
nExpr
-
1
<
size
)
&&
(
tscIsPointInterpQuery
(
pQueryInfo
))))
{
if
((
pFillToken
->
nExpr
<
size
)
||
((
pFillToken
->
nExpr
-
1
<
size
)
&&
(
tscIsPointInterpQuery
(
pQueryInfo
))))
{
tVariantListItem
*
lastItem
=
&
pFillToken
->
a
[
pFillToken
->
nExpr
-
1
];
for
(
int32_t
i
=
numOfFillVal
;
i
<
size
;
++
i
)
{
...
...
@@ -4086,7 +4079,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
if
(
pFields
->
type
==
TSDB_DATA_TYPE_BINARY
||
pFields
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
((
char
*
)
&
pQueryInfo
->
fillVal
[
i
],
pFields
->
type
);
}
else
{
tVariantDump
(
&
lastItem
->
pVar
,
(
char
*
)
&
pQueryInfo
->
fillVal
[
i
],
pFields
->
type
);
tVariantDump
(
&
lastItem
->
pVar
,
(
char
*
)
&
pQueryInfo
->
fillVal
[
i
],
pFields
->
type
,
true
);
}
}
}
...
...
@@ -4168,6 +4161,10 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
if
(
index
.
columnIndex
>=
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
))
{
int32_t
relTagIndex
=
index
.
columnIndex
-
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
);
// it is a tag column
if
(
pQueryInfo
->
groupbyExpr
.
columnInfo
==
NULL
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg2
);
}
SColIndex
*
pColIndex
=
taosArrayGet
(
pQueryInfo
->
groupbyExpr
.
columnInfo
,
0
);
if
(
relTagIndex
==
pColIndex
->
colIndex
)
{
orderByTags
=
true
;
...
...
@@ -4420,10 +4417,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
SSchema
*
pTagsSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
columnIndex
.
columnIndex
);
if
(
tVariantDump
(
&
pVarList
->
a
[
1
].
pVar
,
pAlterSQL
->
tagData
.
data
/*pCmd->payload*/
,
pTagsSchema
->
type
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tVariantDump
(
&
pVarList
->
a
[
1
].
pVar
,
pAlterSQL
->
tagData
.
data
,
pTagsSchema
->
type
,
true
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg13
);
}
pAlterSQL
->
tagData
.
dataLen
=
pTagsSchema
->
bytes
;
// validate the length of binary
...
...
@@ -4680,7 +4677,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
const
char
*
msg0
=
"soffset/offset can not be less than 0"
;
const
char
*
msg1
=
"slimit/soffset only available for STable query"
;
const
char
*
msg2
=
"function
not supported on table
"
;
const
char
*
msg2
=
"function
s mixed up in table query
"
;
const
char
*
msg3
=
"slimit/soffset can not apply to projection query"
;
// handle the limit offset value, validate the limit
...
...
@@ -4763,14 +4760,22 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
}
size_t
size
=
taosArrayGetSize
(
pQueryInfo
->
exprList
);
bool
hasTags
=
false
;
bool
hasOtherFunc
=
false
;
// filter the query functions operating on "tbname" column that are not supported by normal columns.
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
colInfo
.
colIndex
==
TSDB_TBNAME_COLUMN_INDEX
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg2
);
if
(
TSDB_COL_IS_TAG
(
pExpr
->
colInfo
.
flag
))
{
hasTags
=
true
;
}
else
{
hasOtherFunc
=
true
;
}
}
if
(
hasTags
&&
hasOtherFunc
)
{
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
msg2
);
}
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -5571,21 +5576,9 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
if
(
pList
->
a
[
i
].
pVar
.
nLen
+
VARSTR_HEADER_SIZE
>
pTagSchema
[
i
].
bytes
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
ret
=
tVariantDump
(
&
(
pList
->
a
[
i
].
pVar
),
varDataVal
(
tagVal
),
pTagSchema
[
i
].
type
);
if
(
pList
->
a
[
i
].
pVar
.
nType
==
TSDB_DATA_TYPE_NULL
)
{
if
(
pTagSchema
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
)
{
varDataSetLen
(
tagVal
,
sizeof
(
uint8_t
));
}
else
{
varDataSetLen
(
tagVal
,
sizeof
(
uint32_t
));
}
}
else
{
// todo refactor
varDataSetLen
(
tagVal
,
pList
->
a
[
i
].
pVar
.
nLen
);
}
}
else
{
ret
=
tVariantDump
(
&
(
pList
->
a
[
i
].
pVar
),
tagVal
,
pTagSchema
[
i
].
type
);
}
ret
=
tVariantDump
(
&
(
pList
->
a
[
i
].
pVar
),
tagVal
,
pTagSchema
[
i
].
type
,
true
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg4
);
}
...
...
@@ -5845,7 +5838,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pQueryInfo
->
window
.
ekey
=
pQueryInfo
->
window
.
ekey
/
1000
;
}
}
else
{
// set the time rang
pQueryInfo
->
window
.
skey
=
0
;
pQueryInfo
->
window
.
skey
=
TSKEY_INITIAL_VAL
;
pQueryInfo
->
window
.
ekey
=
INT64_MAX
;
}
...
...
src/client/src/tscSecondaryMerge.c
浏览文件 @
c307a201
...
...
@@ -689,7 +689,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SSchema
*
p1
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
pExpr
->
colInfo
.
colIndex
);
int
16
_t
inter
=
0
;
int
32
_t
inter
=
0
;
int16_t
type
=
-
1
;
int16_t
bytes
=
0
;
...
...
@@ -1049,7 +1049,14 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
int32_t
functionId
=
pExpr
->
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
tVariantDestroy
(
&
pCtx
->
tag
);
tVariantCreateFromBinary
(
&
pCtx
->
tag
,
pCtx
->
aInputElemBuf
,
pCtx
->
inputBytes
,
pCtx
->
inputType
);
char
*
input
=
pCtx
->
aInputElemBuf
;
if
(
pCtx
->
inputType
==
TSDB_DATA_TYPE_BINARY
||
pCtx
->
inputType
==
TSDB_DATA_TYPE_NCHAR
)
{
assert
(
varDataLen
(
input
)
<=
pCtx
->
inputBytes
);
tVariantCreateFromBinary
(
&
pCtx
->
tag
,
varDataVal
(
input
),
varDataLen
(
input
),
pCtx
->
inputType
);
}
else
{
tVariantCreateFromBinary
(
&
pCtx
->
tag
,
input
,
pCtx
->
inputBytes
,
pCtx
->
inputType
);
}
}
pCtx
->
currentStage
=
SECONDARY_STAGE_MERGE
;
...
...
@@ -1309,7 +1316,7 @@ static bool isAllSourcesCompleted(SLocalReducer *pLocalReducer) {
return
(
pLocalReducer
->
numOfBuffer
==
pLocalReducer
->
numOfCompleted
);
}
static
bool
do
InterpolationForCurrent
Group
(
SSqlObj
*
pSql
)
{
static
bool
do
BuildFilledResultFor
Group
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
...
...
@@ -1347,8 +1354,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SFillInfo
*
pFillInfo
=
pLocalReducer
->
pFillInfo
;
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SFillInfo
*
pFillInfo
=
pLocalReducer
->
pFillInfo
;
bool
prevGroupCompleted
=
(
!
pLocalReducer
->
discard
)
&&
pLocalReducer
->
hasUnprocessedRow
;
...
...
@@ -1445,7 +1452,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
return
TSDB_CODE_SUCCESS
;
}
if
(
do
InterpolationForCurrent
Group
(
pSql
))
{
if
(
do
BuildFilledResultFor
Group
(
pSql
))
{
pLocalReducer
->
status
=
TSC_LOCALREDUCE_READY
;
// set the flag, taos_free_result can release this result.
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1464,8 +1471,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
#ifdef _DEBUG_VIEW
printf
(
"chosen data in pTree[0] = %d
\n
"
,
pTree
->
pNode
[
0
].
index
);
#endif
assert
((
pTree
->
pNode
[
0
].
index
<
pLocalReducer
->
numOfBuffer
)
&&
(
pTree
->
pNode
[
0
].
index
>=
0
)
&&
tmpBuffer
->
num
==
0
);
assert
((
pTree
->
pNode
[
0
].
index
<
pLocalReducer
->
numOfBuffer
)
&&
(
pTree
->
pNode
[
0
].
index
>=
0
)
&&
tmpBuffer
->
num
==
0
);
// chosen from loser tree
SLocalDataSource
*
pOneDataSrc
=
pLocalReducer
->
pLocalDataSrc
[
pTree
->
pNode
[
0
].
index
];
...
...
src/client/src/tscServer.c
浏览文件 @
c307a201
...
...
@@ -651,7 +651,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg
->
order
=
htons
(
pQueryInfo
->
order
.
order
);
pQueryMsg
->
orderColId
=
htons
(
pQueryInfo
->
order
.
orderColId
);
pQueryMsg
->
fillType
=
htons
(
pQueryInfo
->
fillType
);
pQueryMsg
->
fillType
=
htons
(
pQueryInfo
->
fillType
);
pQueryMsg
->
limit
=
htobe64
(
pQueryInfo
->
limit
.
limit
);
pQueryMsg
->
offset
=
htobe64
(
pQueryInfo
->
limit
.
offset
);
pQueryMsg
->
numOfCols
=
htons
(
taosArrayGetSize
(
pQueryInfo
->
colList
));
...
...
@@ -1287,7 +1287,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pAlterTableMsg
->
numOfCols
=
htons
(
tscNumOfFields
(
pQueryInfo
));
SSchema
*
pSchema
=
pAlterTableMsg
->
schema
;
for
(
int
i
=
0
;
i
<
pAlterTableMsg
->
numOfCols
;
++
i
)
{
for
(
int
i
=
0
;
i
<
tscNumOfFields
(
pQueryInfo
)
;
++
i
)
{
TAOS_FIELD
*
pField
=
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
pSchema
->
type
=
pField
->
type
;
...
...
@@ -1843,17 +1843,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
size_t
size
=
0
;
STableMeta
*
pTableMeta
=
tscCreateTableMetaFromMsg
(
pMetaMsg
,
&
size
);
#if 0
// if current table is created according to super table, get the table meta of super table
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
char id[TSDB_TABLE_ID_LEN + 1] = {0};
strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN);
// NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL
pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id);
}
#endif
// todo add one more function: taosAddDataIfNotExists();
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
&
pSql
->
cmd
,
0
,
0
);
...
...
@@ -1976,7 +1965,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_SUCCESS;
pSql->res.numOfTotal = i;
tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal);
tscTrace("%p load multi-metermeta resp
from
complete num:%d", pSql, pSql->res.numOfTotal);
#endif
return
TSDB_CODE_SUCCESS
;
...
...
src/client/src/tscSql.c
浏览文件 @
c307a201
...
...
@@ -284,12 +284,11 @@ int taos_query(TAOS *taos, const char *sqlstr) {
}
SSqlObj
*
pSql
=
pObj
->
pSql
;
size_t
sqlLen
=
strlen
(
sqlstr
);
size_t
sqlLen
=
strlen
(
sqlstr
);
doAsyncQuery
(
pObj
,
pSql
,
waitForQueryRsp
,
taos
,
sqlstr
,
sqlLen
);
// wait for the callback function to post the semaphore
sem_wait
(
&
pSql
->
rspSem
);
t
sem_wait
(
&
pSql
->
rspSem
);
return
pSql
->
res
.
code
;
}
...
...
@@ -525,7 +524,7 @@ int taos_select_db(TAOS *taos, const char *db) {
return
taos_query
(
taos
,
sql
);
}
void
taos_free_result
_imp
(
TAOS_RES
*
res
,
int
keepCmd
)
{
void
taos_free_result
(
TAOS_RES
*
res
)
{
if
(
res
==
NULL
)
return
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
...
...
@@ -536,26 +535,23 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if
(
pSql
->
signature
!=
pSql
)
return
;
STscObj
*
pObj
=
pSql
->
pTscObj
;
if
(
pRes
==
NULL
||
pRes
->
qhandle
==
0
)
{
/* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace
(
"%p qhandle is null, abort free, fp:%p"
,
pSql
,
pSql
->
fp
);
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
if
(
pTscObj
->
pSql
!=
pSql
)
{
// The semaphore can not be changed while freeing async sub query objects.
if
(
pObj
->
pSql
!=
pSql
)
{
tscTrace
(
"%p SqlObj is freed by app"
,
pSql
);
tscFreeSqlObj
(
pSql
);
}
else
{
if
(
keepCmd
)
{
tscFreeSqlResult
(
pSql
);
}
else
{
tscPartiallyFreeSqlObj
(
pSql
);
}
tscPartiallyFreeSqlObj
(
pSql
);
}
return
;
}
// set freeFlag to 1 in retrieve message if there are un-retrieved results
// set freeFlag to 1 in retrieve message if there are un-retrieved results
data in node
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
tscPartiallyFreeSqlObj
(
pSql
);
...
...
@@ -563,6 +559,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
}
pQueryInfo
->
type
=
TSDB_QUERY_TYPE_FREE_RESOURCE
;
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -579,9 +576,8 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if
((
pCmd
->
command
==
TSDB_SQL_SELECT
||
pCmd
->
command
==
TSDB_SQL_SHOW
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE
||
pCmd
->
command
==
TSDB_SQL_FETCH
)
&&
pRes
->
code
==
TSDB_CODE_SUCCESS
&&
((
pCmd
->
command
<
TSDB_SQL_LOCAL
&&
pRes
->
completed
==
false
)
||
(
pCmd
->
command
==
TSDB_SQL_SELECT
&&
pSql
->
pStream
==
NULL
&&
pTableMetaInfo
->
pTableMeta
!=
NULL
)))
{
pCmd
->
command
==
TSDB_SQL_FETCH
)
&&
pRes
->
code
==
TSDB_CODE_SUCCESS
&&
pRes
->
completed
==
false
&&
(
pCmd
->
command
==
TSDB_SQL_SELECT
&&
pSql
->
pStream
==
NULL
&&
pTableMetaInfo
->
pTableMeta
!=
NULL
))
{
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
tscTrace
(
"%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s"
,
pSql
,
pRes
->
code
,
pRes
->
numOfRows
,
...
...
@@ -591,30 +587,20 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscProcessSql
(
pSql
);
// waits for response and then goes on
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
if
(
pTscObj
->
pSql
==
pSql
)
{
sem_wait
(
&
pSql
->
rspSem
);
}
}
else
{
// if no free resource msg is sent to vnode, we free this object immediately.
STscObj
*
pTscObj
=
pSql
->
pTscObj
;
if
(
pTscObj
->
pSql
!=
pSql
)
{
tscFreeSqlObj
(
pSql
);
tscTrace
(
"%p sql result is freed by app"
,
pSql
);
}
else
{
if
(
keepCmd
)
{
tscFreeSqlResult
(
pSql
);
tscTrace
(
"%p sql result is freed while sql command is kept"
,
pSql
);
}
else
{
tscPartiallyFreeSqlObj
(
pSql
);
tscTrace
(
"%p sql result is freed by app"
,
pSql
);
}
tscPartiallyFreeSqlObj
(
pSql
);
tscTrace
(
"%p sql result is freed by app"
,
pSql
);
}
}
}
void
taos_free_result
(
TAOS_RES
*
res
)
{
taos_free_result_imp
(
res
,
0
);
}
// todo should not be used in async query
int
taos_errno
(
TAOS
*
taos
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
...
...
src/client/src/tscSubquery.c
浏览文件 @
c307a201
...
...
@@ -1084,7 +1084,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
int16_t
bytes
=
0
;
int16_t
type
=
0
;
int
16
_t
inter
=
0
;
int
32
_t
inter
=
0
;
getResultDataInfo
(
s
.
type
,
s
.
bytes
,
TSDB_FUNC_TID_TAG
,
0
,
&
type
,
&
bytes
,
&
inter
,
0
,
0
);
...
...
@@ -1770,6 +1770,8 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
*/
pNew
->
fetchFp
=
pNew
->
fp
;
pSql
->
pSubs
[
i
]
=
pNew
;
pNew
->
fetchFp
=
pNew
->
fp
;
tscTrace
(
"%p sub:%p create subObj success. orderOfSub:%d"
,
pSql
,
pNew
,
i
);
}
...
...
src/client/src/tscUtil.c
浏览文件 @
c307a201
...
...
@@ -421,7 +421,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
memset
(
pCmd
->
payload
,
0
,
(
size_t
)
pCmd
->
allocSize
);
tfree
(
pCmd
->
payload
);
pCmd
->
allocSize
=
0
;
tfree
(
pSql
->
sqlstr
);
...
...
@@ -1033,7 +1032,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
return
pExpr
;
}
int32_t
tscSqlExprNumOfExprs
(
SQueryInfo
*
pQueryInfo
)
{
size_t
tscSqlExprNumOfExprs
(
SQueryInfo
*
pQueryInfo
)
{
return
taosArrayGetSize
(
pQueryInfo
->
exprList
);
}
...
...
@@ -1352,7 +1351,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
return
false
;
}
if
(
colId
==
-
1
&&
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
)
)
{
if
(
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
return
true
;
}
...
...
@@ -1768,11 +1767,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNewQueryInfo
->
limit
=
pQueryInfo
->
limit
;
pNewQueryInfo
->
slimit
=
pQueryInfo
->
slimit
;
pNewQueryInfo
->
order
=
pQueryInfo
->
order
;
pNewQueryInfo
->
clauseLimit
=
pQueryInfo
->
clauseLimit
;
pNewQueryInfo
->
pTableMetaInfo
=
NULL
;
pNewQueryInfo
->
tsBuf
=
NULL
;
pNewQueryInfo
->
fillType
=
pQueryInfo
->
fillType
;
pNewQueryInfo
->
fillVal
=
NULL
;
pNewQueryInfo
->
clauseLimit
=
pQueryInfo
->
clauseLimit
;
pNewQueryInfo
->
numOfTables
=
0
;
pNewQueryInfo
->
tsBuf
=
NULL
;
pNewQueryInfo
->
pTableMetaInfo
=
NULL
;
pNewQueryInfo
->
groupbyExpr
=
pQueryInfo
->
groupbyExpr
;
if
(
pQueryInfo
->
groupbyExpr
.
columnInfo
!=
NULL
)
{
...
...
@@ -1864,7 +1864,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
if
(
pFinalInfo
->
pTableMeta
==
NULL
)
{
tscError
(
"%p new subquery failed for get
pMeter
Meta is NULL from cache"
,
pSql
);
tscError
(
"%p new subquery failed for get
table
Meta is NULL from cache"
,
pSql
);
tscFreeSqlObj
(
pNew
);
return
NULL
;
}
...
...
@@ -2011,7 +2011,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
assert
(
pRes
->
completed
);
// for normal table,
do not try any more if result are exhausted
// for normal table,
no need to try any more if results are all retrieved from one vnode
if
(
!
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
)
||
(
pTableMetaInfo
->
vgroupList
==
NULL
))
{
return
false
;
}
...
...
@@ -2037,7 +2037,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
int32_t
totalVgroups
=
pTableMetaInfo
->
vgroupList
->
numOfVgroups
;
while
(
++
pTableMetaInfo
->
vgroupIndex
<
totalVgroups
)
{
tscTrace
(
"%p
current vnode:%d exhausted, try next:%d. total vnode
:%d. current numOfRes:%d"
,
pSql
,
tscTrace
(
"%p
results from vgroup index:%d completed, try next:%d. total vgroups
:%d. current numOfRes:%d"
,
pSql
,
pTableMetaInfo
->
vgroupIndex
-
1
,
pTableMetaInfo
->
vgroupIndex
,
totalVgroups
,
pRes
->
numOfClauseTotal
);
/*
...
...
@@ -2121,7 +2121,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
int32_t
type
=
pInfo
->
pSqlExpr
->
resType
;
int32_t
bytes
=
pInfo
->
pSqlExpr
->
resBytes
;
char
*
pData
=
((
char
*
)
pRes
->
data
)
+
pInfo
->
pSqlExpr
->
offset
*
pRes
->
numOfRows
+
bytes
*
pRes
->
row
;
char
*
pData
=
pRes
->
data
+
pInfo
->
pSqlExpr
->
offset
*
pRes
->
numOfRows
+
bytes
*
pRes
->
row
;
if
(
type
==
TSDB_DATA_TYPE_NCHAR
||
type
==
TSDB_DATA_TYPE_BINARY
)
{
int32_t
realLen
=
varDataLen
(
pData
);
...
...
@@ -2134,7 +2134,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
}
if
(
realLen
<
pInfo
->
pSqlExpr
->
resBytes
-
VARSTR_HEADER_SIZE
)
{
// todo refactor
*
(
char
*
)
(
pData
+
realLen
+
VARSTR_HEADER_SIZE
)
=
0
;
*
(
pData
+
realLen
+
VARSTR_HEADER_SIZE
)
=
0
;
}
pRes
->
length
[
columnIndex
]
=
realLen
;
...
...
src/common/src/tdataformat.c
浏览文件 @
c307a201
...
...
@@ -196,6 +196,7 @@ void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) {
STagCol
key
=
{
colId
,
0
,
0
};
STagCol
*
stCol
=
taosbsearch
(
&
key
,
pBase
,
nCols
,
sizeof
(
STagCol
),
compTagId
,
TD_EQ
);
if
(
NULL
==
stCol
)
{
type
=
TSDB_DATA_TYPE_NULL
;
return
NULL
;
}
...
...
src/common/src/ttypes.c
浏览文件 @
c307a201
...
...
@@ -32,6 +32,35 @@ const int32_t TYPE_BYTES[11] = {
sizeof
(
VarDataOffsetT
)
// TSDB_DATA_TYPE_NCHAR
};
static
void
getStatics_bool
(
const
TSKEY
*
primaryKey
,
const
void
*
pData
,
int32_t
numOfRow
,
int64_t
*
min
,
int64_t
*
max
,
int64_t
*
sum
,
int16_t
*
minIndex
,
int16_t
*
maxIndex
,
int16_t
*
numOfNull
)
{
int8_t
*
data
=
(
int8_t
*
)
pData
;
*
min
=
INT64_MAX
;
*
max
=
INT64_MIN
;
*
minIndex
=
0
;
*
maxIndex
=
0
;
ASSERT
(
numOfRow
<=
INT16_MAX
);
for
(
int32_t
i
=
0
;
i
<
numOfRow
;
++
i
)
{
if
(
isNull
((
char
*
)
&
data
[
i
],
TSDB_DATA_TYPE_BOOL
))
{
(
*
numOfNull
)
+=
1
;
continue
;
}
*
sum
+=
data
[
i
];
if
(
*
min
>
data
[
i
])
{
*
min
=
data
[
i
];
*
minIndex
=
i
;
}
if
(
*
max
<
data
[
i
])
{
*
max
=
data
[
i
];
*
maxIndex
=
i
;
}
}
}
static
void
getStatics_i8
(
const
TSKEY
*
primaryKey
,
const
void
*
pData
,
int32_t
numOfRow
,
int64_t
*
min
,
int64_t
*
max
,
int64_t
*
sum
,
int16_t
*
minIndex
,
int16_t
*
maxIndex
,
int16_t
*
numOfNull
)
{
int8_t
*
data
=
(
int8_t
*
)
pData
;
...
...
@@ -131,15 +160,6 @@ static void getStatics_i32(const TSKEY *primaryKey, const void *pData, int32_t n
*
max
=
data
[
i
];
*
maxIndex
=
i
;
}
// if (isNull(&lastVal, TSDB_DATA_TYPE_INT)) {
// lastKey = primaryKey[i];
// lastVal = data[i];
// } else {
// *wsum = lastVal * (primaryKey[i] - lastKey);
// lastKey = primaryKey[i];
// lastVal = data[i];
// }
}
}
...
...
@@ -279,11 +299,11 @@ static void getStatics_bin(const TSKEY *primaryKey, const void *pData, int32_t n
ASSERT
(
numOfRow
<=
INT16_MAX
);
for
(
int32_t
i
=
0
;
i
<
numOfRow
;
++
i
)
{
if
(
isNull
(
(
const
char
*
)
varDataVal
(
data
)
,
TSDB_DATA_TYPE_BINARY
))
{
if
(
isNull
(
data
,
TSDB_DATA_TYPE_BINARY
))
{
(
*
numOfNull
)
+=
1
;
}
data
+=
varDataLen
(
data
);
data
+=
varData
T
Len
(
data
);
}
*
sum
=
0
;
...
...
@@ -299,11 +319,11 @@ static void getStatics_nchr(const TSKEY *primaryKey, const void *pData, int32_t
ASSERT
(
numOfRow
<=
INT16_MAX
);
for
(
int32_t
i
=
0
;
i
<
numOfRow
;
++
i
)
{
if
(
isNull
(
(
const
char
*
)
varDataVal
(
data
)
,
TSDB_DATA_TYPE_NCHAR
))
{
if
(
isNull
(
data
,
TSDB_DATA_TYPE_NCHAR
))
{
(
*
numOfNull
)
+=
1
;
}
data
+=
varDataLen
(
data
);
data
+=
varData
T
Len
(
data
);
}
*
sum
=
0
;
...
...
@@ -315,7 +335,7 @@ static void getStatics_nchr(const TSKEY *primaryKey, const void *pData, int32_t
tDataTypeDescriptor
tDataTypeDesc
[
11
]
=
{
{
TSDB_DATA_TYPE_NULL
,
6
,
1
,
"NOTYPE"
,
NULL
,
NULL
,
NULL
},
{
TSDB_DATA_TYPE_BOOL
,
4
,
CHAR_BYTES
,
"BOOL"
,
tsCompressBool
,
tsDecompressBool
,
getStatics_
i8
},
{
TSDB_DATA_TYPE_BOOL
,
4
,
CHAR_BYTES
,
"BOOL"
,
tsCompressBool
,
tsDecompressBool
,
getStatics_
bool
},
{
TSDB_DATA_TYPE_TINYINT
,
7
,
CHAR_BYTES
,
"TINYINT"
,
tsCompressTinyint
,
tsDecompressTinyint
,
getStatics_i8
},
{
TSDB_DATA_TYPE_SMALLINT
,
8
,
SHORT_BYTES
,
"SMALLINT"
,
tsCompressSmallint
,
tsDecompressSmallint
,
getStatics_i16
},
{
TSDB_DATA_TYPE_INT
,
3
,
INT_BYTES
,
"INT"
,
tsCompressInt
,
tsDecompressInt
,
getStatics_i32
},
...
...
src/inc/taosdef.h
浏览文件 @
c307a201
...
...
@@ -293,9 +293,9 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_COMP_LEVEL 2
#define TSDB_DEFAULT_COMP_LEVEL 2
#define TSDB_MIN_WAL_LEVEL
0
#define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL
2
#define TSDB_MIN_WAL_LEVEL
1
#define TSDB_MAX_WAL_LEVEL
2
#define TSDB_DEFAULT_WAL_LEVEL
1
#define TSDB_MIN_REPLICA_NUM 1
#define TSDB_MAX_REPLICA_NUM 3
...
...
src/inc/taosmsg.h
浏览文件 @
c307a201
...
...
@@ -370,7 +370,7 @@ typedef struct SExprInfo {
struct
tExprNode
*
pExpr
;
int16_t
bytes
;
int16_t
type
;
int
16
_t
interBytes
;
int
32
_t
interBytes
;
}
SExprInfo
;
typedef
struct
SColumnFilterInfo
{
...
...
@@ -620,13 +620,6 @@ typedef struct {
SCMVgroupInfo
vgroups
[];
}
SVgroupsInfo
;
//typedef struct {
// int32_t numOfTables;
// int32_t join;
// int32_t joinCondLen; // for join condition
// int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM];
//} SSuperTableMetaMsg;
typedef
struct
STableMetaMsg
{
int32_t
contLen
;
char
tableId
[
TSDB_TABLE_ID_LEN
+
1
];
// table id
...
...
src/inc/tsdb.h
浏览文件 @
c307a201
...
...
@@ -200,6 +200,10 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
*/
TsdbQueryHandleT
tsdbQueryLastRow
(
TsdbRepoT
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
);
SArray
*
tsdbGetQueriedTableIdList
(
TsdbQueryHandleT
*
pHandle
);
TsdbQueryHandleT
tsdbQueryRowsInExternalWindow
(
TsdbRepoT
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
);
/**
* move to next block if exists
*
...
...
src/mnode/src/mnodeDb.c
浏览文件 @
c307a201
...
...
@@ -276,8 +276,8 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return
TSDB_CODE_INVALID_OPTION
;
}
if
(
pCfg
->
replications
>
1
&&
pCfg
->
walLevel
<=
TSDB_MIN_WAL_LEVEL
)
{
mError
(
"invalid db option walLevel:%d must
> 0, while replica:%d > 1"
,
pCfg
->
walLevel
,
pCfg
->
replications
);
if
(
pCfg
->
walLevel
<
TSDB_MIN_WAL_LEVEL
)
{
mError
(
"invalid db option walLevel:%d must
be greater than 0"
,
pCfg
->
walLevel
);
return
TSDB_CODE_INVALID_OPTION
;
}
...
...
@@ -871,8 +871,8 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
mTrace
(
"db:%s, replications:%d change to %d"
,
pDb
->
name
,
pDb
->
cfg
.
replications
,
replications
);
newCfg
.
replications
=
replications
;
if
(
replications
>
1
&&
pDb
->
cfg
.
walLevel
<=
TSDB_MIN_WAL_LEVEL
)
{
mError
(
"db:%s, walLevel:%d must
> 0, while replica:%d > 1"
,
pDb
->
name
,
pDb
->
cfg
.
walLevel
,
replications
);
if
(
pDb
->
cfg
.
walLevel
<
TSDB_MIN_WAL_LEVEL
)
{
mError
(
"db:%s, walLevel:%d must
be greater than 0"
,
pDb
->
name
,
pDb
->
cfg
.
walLevel
);
terrno
=
TSDB_CODE_INVALID_OPTION
;
}
...
...
src/query/inc/qExecutor.h
浏览文件 @
c307a201
...
...
@@ -28,21 +28,16 @@
#include "tsdb.h"
#include "tsqlfunction.h"
//typedef struct tFilePage {
// int64_t num;
// char data[];
//} tFilePage;
struct
SColumnFilterElem
;
typedef
bool
(
*
__filter_func_t
)(
struct
SColumnFilterElem
*
pFilter
,
char
*
val1
,
char
*
val2
);
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
typedef
struct
SSqlGroupbyExpr
{
int16_t
tableIndex
;
SArray
*
columnInfo
;
// SArray<SColIndex>, group by columns information
int16_t
numOfGroupCols
;
int16_t
orderIndex
;
// order by column index
int16_t
orderType
;
// order by type: asc/desc
int16_t
tableIndex
;
SArray
*
columnInfo
;
// SArray<SColIndex>, group by columns information
int16_t
numOfGroupCols
;
int16_t
orderIndex
;
// order by column index
int16_t
orderType
;
// order by type: asc/desc
}
SSqlGroupbyExpr
;
typedef
struct
SPosInfo
{
...
...
@@ -62,25 +57,27 @@ typedef struct SWindowResult {
SWindowStatus
status
;
// this result status: closed or opened
}
SWindowResult
;
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
typedef
struct
SResultRec
{
int64_t
total
;
// total generated result size in rows
int64_t
rows
;
// current result set size in rows
int64_t
capacity
;
// capacity of current result output buffer
// result size threshold in rows. If the result buffer is larger than this, pause query and return to client
int32_t
threshold
;
int64_t
total
;
// total generated result size in rows
int64_t
rows
;
// current result set size in rows
int64_t
capacity
;
// capacity of current result output buffer
int32_t
threshold
;
// result size threshold in rows.
}
SResultRec
;
typedef
struct
SWindowResInfo
{
SWindowResult
*
pResult
;
// result list
void
*
hashList
;
// hash list for quick access
SHashObj
*
hashList
;
// hash list for quick access
int16_t
type
;
// data type for hash key
int32_t
capacity
;
// max capacity
int32_t
curIndex
;
// current start active index
int32_t
size
;
// number of result set
int64_t
startTime
;
// start time of the first time window for sliding query
int64_t
prevSKey
;
// previous (not completed) sliding window start key
int64_t
threshold
;
// threshold to
pausing query and return clos
ed results.
int64_t
threshold
;
// threshold to
halt query and return the generat
ed results.
}
SWindowResInfo
;
typedef
struct
SColumnFilterElem
{
...
...
@@ -90,98 +87,111 @@ typedef struct SColumnFilterElem {
}
SColumnFilterElem
;
typedef
struct
SSingleColumnFilterInfo
{
SColumnInfo
info
;
void
*
pData
;
int32_t
numOfFilters
;
SColumnInfo
info
;
SColumnFilterElem
*
pFilters
;
void
*
pData
;
}
SSingleColumnFilterInfo
;
typedef
struct
STableQueryInfo
{
// todo merge with the STableQueryInfo struct
int32_t
tableIndex
;
int32_t
groupI
dx
;
// group id in table list
int32_t
groupI
ndex
;
// group id in table list
TSKEY
lastKey
;
int32_t
numOfRes
;
int16_t
queryRangeSet
;
// denote if the query range is set, only available for interval query
int64_t
tag
;
STimeWindow
win
;
STSCursor
cur
;
STableId
id
;
// for retrieve the page id list
STableId
id
;
// for retrieve the page id list
SWindowResInfo
windowResInfo
;
}
STableQueryInfo
;
typedef
struct
SQueryCostSummary
{
}
SQueryCostSummary
;
typedef
struct
SQueryCostInfo
{
uint64_t
loadStatisTime
;
uint64_t
loadFileBlockTime
;
uint64_t
loadDataInCacheTime
;
uint64_t
loadStatisSize
;
uint64_t
loadFileBlockSize
;
uint64_t
loadDataInCacheSize
;
uint64_t
loadDataTime
;
uint64_t
dataInRows
;
uint64_t
checkRows
;
uint32_t
dataBlocks
;
uint32_t
loadBlockStatis
;
uint32_t
discardBlocks
;
}
SQueryCostInfo
;
typedef
struct
SGroupItem
{
STableId
id
;
STableId
id
;
STableQueryInfo
*
info
;
}
SGroupItem
;
typedef
struct
SQuery
{
int16_t
numOfCols
;
int16_t
numOfTags
;
S
OrderVal
order
;
STimeWindow
window
;
int64_t
intervalTime
;
int64_t
slidingTime
;
// sliding time for sliding window query
char
slidingTimeUnit
;
// interval data type, used for daytime revise
int
8_t
precision
;
int16_t
numOfOutput
;
int16_t
fillType
;
int16_t
checkBuffer
;
// check if the buffer is full during scan each block
SLimitVal
limit
;
int32_t
rowSize
;
S
SqlGroupbyExpr
*
pGroupby
Expr
;
S
ExprInfo
*
pSelectExpr
;
SColumnInfo
*
c
olList
;
SColumnInfo
*
tagColList
;
int
32_t
numOfFilterCols
;
int64_t
*
fillVal
;
uint32_t
status
;
// query status
SResultRec
rec
;
int32_t
pos
;
tFilePage
**
sdata
;
STableQueryInfo
*
current
;
int16_t
numOfCols
;
int16_t
numOfTags
;
SOrderVal
order
;
S
TimeWindow
window
;
int64_t
intervalTime
;
int64_t
slidingTime
;
// sliding time for sliding window query
char
slidingTimeUnit
;
// interval data type, used for daytime revise
int8_t
precision
;
int
16_t
numOfOutput
;
int16_t
fillType
;
int16_t
checkBuffer
;
// check if the buffer is full during scan each block
SLimitVal
limit
;
int32_t
rowSize
;
SSqlGroupbyExpr
*
pGroupbyExpr
;
S
ExprInfo
*
pSelect
Expr
;
S
ColumnInfo
*
colList
;
SColumnInfo
*
tagC
olList
;
int32_t
numOfFilterCols
;
int
64_t
*
fillVal
;
uint32_t
status
;
// query status
SResultRec
rec
;
int32_t
pos
;
tFilePage
**
sdata
;
STableQueryInfo
*
current
;
SSingleColumnFilterInfo
*
pFilterInfo
;
}
SQuery
;
typedef
struct
SQueryRuntimeEnv
{
SResultInfo
*
resultInfo
;
// todo refactor to merge with SWindowResInfo
SQuery
*
pQuery
;
SQLFunctionCtx
*
pCtx
;
int16_t
numOfRowsPerPage
;
int16_t
offset
[
TSDB_MAX_COLUMNS
];
uint16_t
scanFlag
;
// denotes reversed scan of data or not
SFillInfo
*
pFillInfo
;
SWindowResInfo
windowResInfo
;
STSBuf
*
pTSBuf
;
STSCursor
cur
;
SQueryCost
Summary
summary
;
bool
stableQuery
;
// super table query or not
void
*
pQueryHandle
;
void
*
pSecQueryHandle
;
// another thread for
SDiskbasedResultBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SResultInfo
*
resultInfo
;
// todo refactor to merge with SWindowResInfo
SQuery
*
pQuery
;
SQLFunctionCtx
*
pCtx
;
int16_t
numOfRowsPerPage
;
int16_t
offset
[
TSDB_MAX_COLUMNS
];
uint16_t
scanFlag
;
// denotes reversed scan of data or not
SFillInfo
*
pFillInfo
;
SWindowResInfo
windowResInfo
;
STSBuf
*
pTSBuf
;
STSCursor
cur
;
SQueryCost
Info
summary
;
bool
stableQuery
;
// super table query or not
void
*
pQueryHandle
;
void
*
pSecQueryHandle
;
// another thread for
SDiskbasedResultBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
}
SQueryRuntimeEnv
;
typedef
struct
SQInfo
{
void
*
signature
;
TSKEY
startTime
;
TSKEY
elapsedTime
;
int32_t
pointsInterpo
;
int32_t
code
;
// error code to returned to client
sem_t
dataReady
;
void
*
tsdb
;
int32_t
vgId
;
void
*
signature
;
TSKEY
startTime
;
TSKEY
elapsedTime
;
int32_t
pointsInterpo
;
int32_t
code
;
// error code to returned to client
sem_t
dataReady
;
void
*
tsdb
;
int32_t
vgId
;
STableGroupInfo
tableIdGroupInfo
;
// table id list < only includes the STableId list>
STableGroupInfo
groupInfo
;
//
SQueryRuntimeEnv
runtimeEnv
;
int32_t
groupIndex
;
int32_t
offset
;
// offset in group result set of subgroup, todo refactor
int32_t
offset
;
// offset in group result set of subgroup, todo refactor
SArray
*
arrTableIdInfo
;
T_REF_DECLARE
()
/*
* the query is executed position on which meter of the whole list.
...
...
@@ -189,8 +199,8 @@ typedef struct SQInfo {
* We later may refactor to remove this attribution by using another flag to denote
* whether a multimeter query is completed or not.
*/
int32_t
tableIndex
;
int32_t
numOfGroupResultPages
;
int32_t
tableIndex
;
int32_t
numOfGroupResultPages
;
}
SQInfo
;
#endif // TDENGINE_QUERYEXECUTOR_H
src/query/inc/qextbuffer.h
浏览文件 @
c307a201
...
...
@@ -28,8 +28,7 @@ extern "C" {
#include "tdataformat.h"
#include "talgo.h"
#define DEFAULT_PAGE_SIZE 16384 // 16k larger than the SHistoInfo
#define MIN_BUFFER_SIZE (1 << 19)
#define DEFAULT_PAGE_SIZE (1024L*56) // 16k larger than the SHistoInfo
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
...
...
src/query/inc/qfill.h
浏览文件 @
c307a201
...
...
@@ -45,12 +45,13 @@ typedef struct SFillInfo {
int32_t
numOfCols
;
// number of columns, including the tags columns
int32_t
rowSize
;
// size of each row
char
**
pTags
;
// tags value for current interpolation
int64_t
slidingTime
;
// sliding value to determine the number of result for a given time window
int64_t
slidingTime
;
// sliding value to determine the number of result for a given time window
char
*
prevValues
;
// previous row of data, to generate the interpolation results
char
*
nextValues
;
// next row of data
char
**
pData
;
// original result data block involved in filling data
int32_t
capacityInRows
;
// data buffer size in rows
SFillColInfo
*
pFillCol
;
// column info for fill operations
char
**
pData
;
// original result data block involved in filling data
}
SFillInfo
;
typedef
struct
SPoint
{
...
...
src/query/inc/qresultBuf.h
浏览文件 @
c307a201
...
...
@@ -44,6 +44,8 @@ typedef struct SDiskbasedResultBuf {
SIDList
*
list
;
// for each id, there is a page id list
}
SDiskbasedResultBuf
;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5)
/**
* create disk-based result buffer
* @param pResultBuf
...
...
src/query/inc/tsqlfunction.h
浏览文件 @
c307a201
...
...
@@ -161,26 +161,24 @@ typedef struct SExtTagsInfo {
// sql function runtime context
typedef
struct
SQLFunctionCtx
{
int32_t
startOffset
;
int32_t
size
;
// number of rows
uint32_t
order
;
// asc|desc
uint32_t
scanFlag
;
// TODO merge with currentStage
int16_t
inputType
;
int16_t
inputBytes
;
int16_t
outputType
;
int16_t
outputBytes
;
// size of results, determined by function and input column data type
bool
hasNull
;
// null value exist in current block
int16_t
functionId
;
// function id
void
*
aInputElemBuf
;
char
*
aOutputBuf
;
// final result output buffer, point to sdata->data
uint8_t
currentStage
;
// record current running step, default: 0
int64_t
nStartQueryTimestamp
;
// timestamp range of current query when function is executed on a specific data block
int32_t
numOfParams
;
tVariant
param
[
4
];
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param */
int64_t
*
ptsList
;
// corresponding timestamp array list
void
*
ptsOutputBuf
;
// corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t
startOffset
;
int32_t
size
;
// number of rows
uint32_t
order
;
// asc|desc
int16_t
inputType
;
int16_t
inputBytes
;
int16_t
outputType
;
int16_t
outputBytes
;
// size of results, determined by function and input column data type
bool
hasNull
;
// null value exist in current block
int16_t
functionId
;
// function id
void
*
aInputElemBuf
;
char
*
aOutputBuf
;
// final result output buffer, point to sdata->data
uint8_t
currentStage
;
// record current running step, default: 0
int64_t
nStartQueryTimestamp
;
// timestamp range of current query when function is executed on a specific data block
int32_t
numOfParams
;
tVariant
param
[
4
];
// input parameter, e.g., top(k, 20), the number of results for top query is kept in param */
int64_t
*
ptsList
;
// corresponding timestamp array list
void
*
ptsOutputBuf
;
// corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SQLPreAggVal
preAggVals
;
tVariant
tag
;
SResultInfo
*
resultInfo
;
...
...
@@ -219,7 +217,7 @@ typedef struct SQLAggFuncElem {
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
int16_t
*
type
,
int16_t
*
len
,
int
16
_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
);
int16_t
*
len
,
int
32
_t
*
interBytes
,
int16_t
extLength
,
bool
isSuperTable
);
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
...
...
@@ -239,7 +237,7 @@ enum {
/* determine the real data need to calculated the result */
enum
{
BLK_DATA_NO_NEEDED
=
0x0
,
BLK_DATA_
FILED
S_NEEDED
=
0x1
,
BLK_DATA_
STATI
S_NEEDED
=
0x1
,
BLK_DATA_ALL_NEEDED
=
0x3
,
};
...
...
@@ -269,9 +267,6 @@ extern struct SQLAggFuncElem aAggs[];
/* compatible check array list */
extern
int32_t
funcCompatDefList
[];
void
getStatistics
(
char
*
priData
,
char
*
data
,
int32_t
size
,
int32_t
numOfRow
,
int32_t
type
,
int64_t
*
min
,
int64_t
*
max
,
int64_t
*
sum
,
int16_t
*
minIndex
,
int16_t
*
maxIndex
,
int32_t
*
numOfNull
);
bool
top_bot_datablock_filter
(
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
,
char
*
minval
,
char
*
maxval
);
bool
stableQueryFunctChanged
(
int32_t
funcId
);
...
...
src/query/inc/tvariant.h
浏览文件 @
c307a201
...
...
@@ -48,7 +48,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc);
int32_t
tVariantToString
(
tVariant
*
pVar
,
char
*
dst
);
int32_t
tVariantDump
(
tVariant
*
pVariant
,
char
*
payload
,
char
type
);
int32_t
tVariantDump
(
tVariant
*
pVariant
,
char
*
payload
,
int16_t
type
,
bool
includeLengthPrefix
);
int32_t
tVariantTypeSetType
(
tVariant
*
pVariant
,
char
type
);
...
...
src/query/src/qExecutor.c
浏览文件 @
c307a201
此差异已折叠。
点击以展开。
src/query/src/qFilterFunc.c
浏览文件 @
c307a201
...
...
@@ -209,7 +209,7 @@ bool like_str(SColumnFilterElem *pFilter, char *minval, char *maxval) {
bool
like_nchar
(
SColumnFilterElem
*
pFilter
,
char
*
minval
,
char
*
maxval
)
{
SPatternCompareInfo
info
=
PATTERN_COMPARE_INFO_INITIALIZER
;
return
WCSPatternMatch
((
wchar_t
*
)
pFilter
->
filterInfo
.
pz
,
varDataVal
(
minval
),
varDataLen
(
minval
)
/
TSDB_NCHAR_SIZE
,
&
info
)
==
TSDB_PATTERN_MATCH
;
return
WCSPatternMatch
((
wchar_t
*
)
pFilter
->
filterInfo
.
pz
,
varDataVal
(
minval
),
varDataLen
(
minval
)
/
TSDB_NCHAR_SIZE
,
&
info
)
==
TSDB_PATTERN_MATCH
;
}
////////////////////////////////////////////////////////////////
...
...
src/query/src/qUtil.c
浏览文件 @
c307a201
...
...
@@ -137,11 +137,10 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
for
(
int32_t
k
=
0
;
k
<
pWindowResInfo
->
size
;
++
k
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
k
];
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
);
int32_t
v
=
(
*
p
-
num
);
assert
(
v
>=
0
&&
v
<=
pWindowResInfo
->
size
);
taosHashPut
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
,
(
char
*
)
&
v
,
sizeof
(
int32_t
));
taosHashPut
(
pWindowResInfo
->
hashList
,
(
char
*
)
&
pResult
->
window
.
skey
,
TSDB_KEYSIZE
,
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
pWindowResInfo
->
curIndex
=
-
1
;
...
...
src/query/src/qfill.c
浏览文件 @
c307a201
...
...
@@ -79,7 +79,7 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
int32_t
rowsize
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
int32_t
bytes
=
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
;
pFillInfo
->
pData
[
i
]
=
calloc
(
1
,
sizeof
(
tFilePage
)
+
bytes
*
capacity
);
pFillInfo
->
pData
[
i
]
=
calloc
(
1
,
bytes
*
capacity
);
rowsize
+=
bytes
;
}
...
...
@@ -89,6 +89,8 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
}
pFillInfo
->
rowSize
=
rowsize
;
pFillInfo
->
capacityInRows
=
capacity
;
return
pFillInfo
;
}
...
...
@@ -119,6 +121,17 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
pFillInfo
->
rowIdx
=
0
;
pFillInfo
->
endKey
=
endKey
;
pFillInfo
->
numOfRows
=
numOfRows
;
// ensure the space
if
(
pFillInfo
->
capacityInRows
<
numOfRows
)
{
for
(
int32_t
i
=
0
;
i
<
pFillInfo
->
numOfCols
;
++
i
)
{
char
*
tmp
=
realloc
(
pFillInfo
->
pData
[
i
],
numOfRows
*
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
);
assert
(
tmp
!=
NULL
);
// todo handle error
memset
(
tmp
,
0
,
numOfRows
*
pFillInfo
->
pFillCol
[
i
].
col
.
bytes
);
pFillInfo
->
pData
[
i
]
=
tmp
;
}
}
}
void
taosFillCopyInputDataFromFilePage
(
SFillInfo
*
pFillInfo
,
tFilePage
**
pInput
)
{
...
...
@@ -474,11 +487,11 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
}
int64_t
taosGenerateDataBlock
(
SFillInfo
*
pFillInfo
,
tFilePage
**
output
,
int32_t
capacity
)
{
int32_t
remain
=
taosNumOfRemainRows
(
pFillInfo
);
// todo use iterator?
int32_t
rows
=
taosGetNumOfResultWithFill
(
pFillInfo
,
remain
,
pFillInfo
->
endKey
,
capacity
);
int32_t
remain
=
taosNumOfRemainRows
(
pFillInfo
);
// todo use iterator?
int32_t
rows
=
taosGetNumOfResultWithFill
(
pFillInfo
,
remain
,
pFillInfo
->
endKey
,
capacity
);
int32_t
numOfRes
=
generateDataBlockImpl
(
pFillInfo
,
output
,
remain
,
rows
,
pFillInfo
->
pData
);
assert
(
numOfRes
==
rows
);
int32_t
numOfRes
=
generateDataBlockImpl
(
pFillInfo
,
output
,
remain
,
rows
,
pFillInfo
->
pData
);
assert
(
numOfRes
==
rows
);
return
numOfRes
;
return
numOfRes
;
}
src/query/src/qresultBuf.c
浏览文件 @
c307a201
...
...
@@ -5,14 +5,12 @@
#include "tsqlfunction.h"
#include "queryLog.h"
#define DEFAULT_INTERN_BUF_SIZE 16384L
int32_t
createDiskbasedResultBuffer
(
SDiskbasedResultBuf
**
pResultBuf
,
int32_t
size
,
int32_t
rowSize
,
void
*
handle
)
{
SDiskbasedResultBuf
*
pResBuf
=
calloc
(
1
,
sizeof
(
SDiskbasedResultBuf
));
pResBuf
->
numOfRowsPerPage
=
(
DEFAULT_INTERN_BUF_SIZE
-
sizeof
(
tFilePage
))
/
rowSize
;
pResBuf
->
numOfRowsPerPage
=
(
DEFAULT_INTERN_BUF_
PAGE_
SIZE
-
sizeof
(
tFilePage
))
/
rowSize
;
pResBuf
->
numOfPages
=
size
;
pResBuf
->
totalBufSize
=
pResBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
;
pResBuf
->
totalBufSize
=
pResBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_
PAGE_
SIZE
;
pResBuf
->
incStep
=
4
;
// init id hash table
...
...
@@ -33,7 +31,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
return
TSDB_CODE_CLI_NO_DISKSPACE
;
}
int32_t
ret
=
ftruncate
(
pResBuf
->
fd
,
pResBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
);
int32_t
ret
=
ftruncate
(
pResBuf
->
fd
,
pResBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_
PAGE_
SIZE
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to create tmp file: %s on disk. %s"
,
pResBuf
->
path
,
strerror
(
errno
));
return
TSDB_CODE_CLI_NO_DISKSPACE
;
...
...
@@ -55,7 +53,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
tFilePage
*
getResultBufferPageById
(
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
id
)
{
assert
(
id
<
pResultBuf
->
numOfPages
&&
id
>=
0
);
return
(
tFilePage
*
)(
pResultBuf
->
pBuf
+
DEFAULT_INTERN_BUF_SIZE
*
id
);
return
(
tFilePage
*
)(
pResultBuf
->
pBuf
+
DEFAULT_INTERN_BUF_
PAGE_
SIZE
*
id
);
}
int32_t
getNumOfResultBufGroupId
(
SDiskbasedResultBuf
*
pResultBuf
)
{
return
taosHashGetSize
(
pResultBuf
->
idsTable
);
}
...
...
@@ -63,7 +61,7 @@ int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosH
int32_t
getResBufSize
(
SDiskbasedResultBuf
*
pResultBuf
)
{
return
pResultBuf
->
totalBufSize
;
}
static
int32_t
extendDiskFileSize
(
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
numOfPages
)
{
assert
(
pResultBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
==
pResultBuf
->
totalBufSize
);
assert
(
pResultBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_
PAGE_
SIZE
==
pResultBuf
->
totalBufSize
);
int32_t
ret
=
munmap
(
pResultBuf
->
pBuf
,
pResultBuf
->
totalBufSize
);
pResultBuf
->
numOfPages
+=
numOfPages
;
...
...
@@ -72,14 +70,14 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
* be insufficient
*/
ret
=
ftruncate
(
pResultBuf
->
fd
,
pResultBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
);
ret
=
ftruncate
(
pResultBuf
->
fd
,
pResultBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_
PAGE_
SIZE
);
if
(
ret
!=
0
)
{
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno));
return
-
TSDB_CODE_SERV_NO_DISKSPACE
;
}
pResultBuf
->
totalBufSize
=
pResultBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_SIZE
;
pResultBuf
->
totalBufSize
=
pResultBuf
->
numOfPages
*
DEFAULT_INTERN_BUF_
PAGE_
SIZE
;
pResultBuf
->
pBuf
=
mmap
(
NULL
,
pResultBuf
->
totalBufSize
,
PROT_READ
|
PROT_WRITE
,
MAP_SHARED
,
pResultBuf
->
fd
,
0
);
if
(
pResultBuf
->
pBuf
==
MAP_FAILED
)
{
...
...
@@ -174,7 +172,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
tFilePage
*
page
=
getResultBufferPageById
(
pResultBuf
,
*
pageId
);
// clear memory for the new page
memset
(
page
,
0
,
DEFAULT_INTERN_BUF_SIZE
);
memset
(
page
,
0
,
DEFAULT_INTERN_BUF_
PAGE_
SIZE
);
return
page
;
}
...
...
src/query/src/tvariant.c
浏览文件 @
c307a201
...
...
@@ -363,8 +363,6 @@ static int32_t toBinary(tVariant *pVariant, char **pDest, int32_t *pDestSize) {
taosUcs4ToMbs
(
pVariant
->
wpz
,
newSize
,
pBuf
);
free
(
pVariant
->
wpz
);
/* terminated string */
pBuf
[
newSize
]
=
0
;
}
else
{
taosUcs4ToMbs
(
pVariant
->
wpz
,
newSize
,
*
pDest
);
...
...
@@ -598,7 +596,7 @@ static int32_t convertToBool(tVariant *pVariant, int64_t *pDest) {
*
* todo handle the return value
*/
int32_t
tVariantDump
(
tVariant
*
pVariant
,
char
*
payload
,
char
type
)
{
int32_t
tVariantDump
(
tVariant
*
pVariant
,
char
*
payload
,
int16_t
type
,
bool
includeLengthPrefix
)
{
if
(
pVariant
==
NULL
||
(
pVariant
->
nType
!=
0
&&
!
isValidDataType
(
pVariant
->
nType
,
pVariant
->
nLen
)))
{
return
-
1
;
}
...
...
@@ -765,13 +763,30 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) {
}
case
TSDB_DATA_TYPE_BINARY
:
{
if
(
pVariant
->
nType
==
TSDB_DATA_TYPE_NULL
)
{
*
payload
=
TSDB_DATA_BINARY_NULL
;
if
(
!
includeLengthPrefix
)
{
if
(
pVariant
->
nType
==
TSDB_DATA_TYPE_NULL
)
{
*
(
uint8_t
*
)
payload
=
TSDB_DATA_BINARY_NULL
;
}
else
{
if
(
pVariant
->
nType
!=
TSDB_DATA_TYPE_BINARY
)
{
toBinary
(
pVariant
,
&
payload
,
&
pVariant
->
nLen
);
}
else
{
strncpy
(
payload
,
pVariant
->
pz
,
pVariant
->
nLen
);
}
}
}
else
{
if
(
pVariant
->
nType
!=
TSDB_DATA_TYPE_BINARY
)
{
toBinary
(
pVariant
,
&
payload
,
&
pVariant
->
nLen
);
if
(
pVariant
->
nType
==
TSDB_DATA_TYPE_NULL
)
{
setVardataNull
(
payload
,
TSDB_DATA_TYPE_BINARY
);
}
else
{
strncpy
(
payload
,
pVariant
->
pz
,
pVariant
->
nLen
);
char
*
p
=
varDataVal
(
payload
);
if
(
pVariant
->
nType
!=
TSDB_DATA_TYPE_BINARY
)
{
toBinary
(
pVariant
,
&
p
,
&
pVariant
->
nLen
);
}
else
{
strncpy
(
p
,
pVariant
->
pz
,
pVariant
->
nLen
);
}
varDataSetLen
(
payload
,
pVariant
->
nLen
);
assert
(
p
==
varDataVal
(
payload
));
}
}
break
;
...
...
@@ -785,15 +800,33 @@ int32_t tVariantDump(tVariant *pVariant, char *payload, char type) {
break
;
}
case
TSDB_DATA_TYPE_NCHAR
:
{
if
(
pVariant
->
nType
==
TSDB_DATA_TYPE_NULL
)
{
*
(
uint32_t
*
)
payload
=
TSDB_DATA_NCHAR_NULL
;
if
(
!
includeLengthPrefix
)
{
if
(
pVariant
->
nType
==
TSDB_DATA_TYPE_NULL
)
{
*
(
uint32_t
*
)
payload
=
TSDB_DATA_NCHAR_NULL
;
}
else
{
if
(
pVariant
->
nType
!=
TSDB_DATA_TYPE_NCHAR
)
{
toNchar
(
pVariant
,
&
payload
,
&
pVariant
->
nLen
);
}
else
{
wcsncpy
((
wchar_t
*
)
payload
,
pVariant
->
wpz
,
pVariant
->
nLen
);
}
}
}
else
{
if
(
pVariant
->
nType
!=
TSDB_DATA_TYPE_NCHAR
)
{
toNchar
(
pVariant
,
&
payload
,
&
pVariant
->
nLen
);
if
(
pVariant
->
nType
==
TSDB_DATA_TYPE_NULL
)
{
setVardataNull
(
payload
,
TSDB_DATA_TYPE_NCHAR
);
}
else
{
wcsncpy
((
wchar_t
*
)
payload
,
pVariant
->
wpz
,
pVariant
->
nLen
);
char
*
p
=
varDataVal
(
payload
);
if
(
pVariant
->
nType
!=
TSDB_DATA_TYPE_NCHAR
)
{
toNchar
(
pVariant
,
&
p
,
&
pVariant
->
nLen
);
}
else
{
wcsncpy
((
wchar_t
*
)
p
,
pVariant
->
wpz
,
pVariant
->
nLen
);
}
varDataSetLen
(
payload
,
pVariant
->
nLen
);
// the length may be changed after toNchar function called
assert
(
p
==
varDataVal
(
payload
));
}
}
break
;
}
}
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
c307a201
...
...
@@ -288,7 +288,11 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf
static
int
compFGroupKey
(
const
void
*
key
,
const
void
*
fgroup
)
{
int
fid
=
*
(
int
*
)
key
;
SFileGroup
*
pFGroup
=
(
SFileGroup
*
)
fgroup
;
return
(
fid
-
pFGroup
->
fileId
);
if
(
fid
==
pFGroup
->
fileId
)
{
return
0
;
}
else
{
return
fid
>
pFGroup
->
fileId
?
1
:-
1
;
}
}
static
int
compFGroup
(
const
void
*
arg1
,
const
void
*
arg2
)
{
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
c307a201
...
...
@@ -103,7 +103,8 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
if
(
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
ptr
=
taosDecodeString
(
ptr
,
&
(
pTable
->
sql
));
}
pTable
->
lastKey
=
TSKEY_INITIAL_VAL
;
return
pTable
;
}
...
...
@@ -118,7 +119,7 @@ static char* getTagIndexKey(const void* pData) {
STSchema
*
pSchema
=
tsdbGetTableTagSchema
(
elem
->
pMeta
,
elem
->
pTable
);
STColumn
*
pCol
=
&
pSchema
->
columns
[
DEFAULT_TAG_INDEX_COLUMN
];
int16_t
type
=
0
;
void
*
res
=
tdQueryTagByID
(
row
,
pCol
->
colId
,
&
type
);
void
*
res
=
tdQueryTagByID
(
row
,
pCol
->
colId
,
&
type
);
ASSERT
(
type
==
pCol
->
type
);
return
res
;
}
...
...
@@ -255,30 +256,18 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
repo
);
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
id
->
uid
);
STSchema
*
pSchema
=
tsdbGetTableTagSchema
(
pMeta
,
pTable
);
STColumn
*
pCol
=
NULL
;
*
val
=
tdQueryTagByID
(
pTable
->
tagVal
,
colId
,
type
);
// todo binary search
for
(
int32_t
col
=
0
;
col
<
schemaNCols
(
pSchema
);
++
col
)
{
STColumn
*
p
=
schemaColAt
(
pSchema
,
col
);
if
(
p
->
colId
==
colId
)
{
pCol
=
p
;
break
;
if
(
*
val
!=
NULL
)
{
switch
(
*
type
)
{
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
*
bytes
=
varDataLen
(
*
val
);
break
;
case
TSDB_DATA_TYPE_NULL
:
*
bytes
=
0
;
break
;
default:
*
bytes
=
tDataTypeDesc
[
*
type
].
nSize
;
break
;
}
}
if
(
pCol
==
NULL
)
{
return
-
1
;
// No matched tags. Maybe the modification of tags has not been done yet.
}
SDataRow
row
=
(
SDataRow
)
pTable
->
tagVal
;
int16_t
tagtype
=
0
;
char
*
d
=
tdQueryTagByID
(
row
,
pCol
->
colId
,
&
tagtype
);
//ASSERT((int8_t)tagtype == pCol->type)
*
val
=
d
;
*
type
=
pCol
->
type
;
*
bytes
=
pCol
->
bytes
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -405,7 +394,9 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
return
-
1
;
}
}
table
->
lastKey
=
TSKEY_INITIAL_VAL
;
// Register to meta
if
(
newSuper
)
{
tsdbAddTableToMeta
(
pMeta
,
super
,
true
);
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
c307a201
此差异已折叠。
点击以展开。
src/util/inc/hash.h
浏览文件 @
c307a201
...
...
@@ -30,24 +30,19 @@ typedef void (*_hash_free_fn_t)(void *param);
typedef
struct
SHashNode
{
char
*
key
;
union
{
//
union {
struct
SHashNode
*
prev
;
struct
SHashEntry
*
prev1
;
};
//
struct SHashEntry *prev1;
//
};
//
struct
SHashNode
*
next
;
uint32_t
hashVal
;
// the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
uint32_t
keyLen
;
// length of the key
char
data
[];
}
SHashNode
;
typedef
struct
SHashEntry
{
SHashNode
*
next
;
uint32_t
num
;
}
SHashEntry
;
typedef
struct
SHashObj
{
SHash
Entry
**
hashList
;
SHash
Node
**
hashList
;
size_t
capacity
;
// number of slots
size_t
size
;
// number of elements in hash table
_hash_fn_t
hashFp
;
// hash function
...
...
src/util/src/hash.c
浏览文件 @
c307a201
...
...
@@ -83,17 +83,10 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
int32_t
len
=
MIN
(
length
,
HASH_MAX_CAPACITY
);
uint32_t
i
=
4
;
while
(
i
<
len
)
i
=
(
i
<<
1
U
);
while
(
i
<
len
)
i
=
(
i
<<
1
u
);
return
i
;
}
/**
* inplace update node in hash table
* @param pHashObj hash table object
* @param pNode hash data node
*/
static
void
doUpdateHashTable
(
SHashObj
*
pHashObj
,
SHashNode
*
pNode
);
/**
* Get SHashNode from hashlist, nodes from trash are not included.
* @param pHashObj Cache objection
...
...
@@ -105,10 +98,9 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode);
FORCE_INLINE
SHashNode
*
doGetNodeFromHashTable
(
SHashObj
*
pHashObj
,
const
void
*
key
,
uint32_t
keyLen
,
uint32_t
*
hashVal
)
{
uint32_t
hash
=
(
*
pHashObj
->
hashFp
)(
key
,
keyLen
);
int32_t
slot
=
HASH_INDEX
(
hash
,
pHashObj
->
capacity
);
SHash
Entry
*
pEntry
=
pHashObj
->
hashList
[
slot
];
int32_t
slot
=
HASH_INDEX
(
hash
,
pHashObj
->
capacity
);
SHash
Node
*
pNode
=
pHashObj
->
hashList
[
slot
];
SHashNode
*
pNode
=
pEntry
->
next
;
while
(
pNode
)
{
if
((
pNode
->
keyLen
==
keyLen
)
&&
(
memcmp
(
pNode
->
key
,
key
,
keyLen
)
==
0
))
{
break
;
...
...
@@ -190,17 +182,13 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
pHashObj
->
hashFp
=
fn
;
pHashObj
->
hashList
=
(
SHash
Entry
**
)
calloc
(
pHashObj
->
capacity
,
sizeof
(
SHashEntry
*
)
);
pHashObj
->
hashList
=
(
SHash
Node
**
)
calloc
(
pHashObj
->
capacity
,
POINTER_BYTES
);
if
(
pHashObj
->
hashList
==
NULL
)
{
free
(
pHashObj
);
uError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
capacity
;
++
i
)
{
pHashObj
->
hashList
[
i
]
=
calloc
(
1
,
sizeof
(
SHashEntry
));
}
if
(
threadsafe
)
{
#if defined(LINUX)
pHashObj
->
lock
=
calloc
(
1
,
sizeof
(
pthread_rwlock_t
));
...
...
@@ -252,7 +240,18 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
return
-
1
;
}
doUpdateHashTable
(
pHashObj
,
pNewNode
);
if
(
pNewNode
->
prev
)
{
pNewNode
->
prev
->
next
=
pNewNode
;
}
else
{
int32_t
slot
=
HASH_INDEX
(
pNewNode
->
hashVal
,
pHashObj
->
capacity
);
assert
(
pHashObj
->
hashList
[
slot
]
==
pNode
);
pHashObj
->
hashList
[
slot
]
=
pNewNode
;
}
if
(
pNewNode
->
next
)
{
(
pNewNode
->
next
)
->
prev
=
pNewNode
;
}
}
__unlock
(
pHashObj
->
lock
);
...
...
@@ -287,24 +286,19 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
}
SHashNode
*
pNext
=
pNode
->
next
;
if
(
pNode
->
prev
!
=
NULL
)
{
if
(
pNode
->
prev
=
=
NULL
)
{
int32_t
slot
=
HASH_INDEX
(
val
,
pHashObj
->
capacity
);
if
(
pHashObj
->
hashList
[
slot
]
->
next
==
pNode
)
{
pHashObj
->
hashList
[
slot
]
->
next
=
pNext
;
}
else
{
pNode
->
prev
->
next
=
pNext
;
}
assert
(
pHashObj
->
hashList
[
slot
]
==
pNode
);
pHashObj
->
hashList
[
slot
]
=
pNext
;
}
else
{
pNode
->
prev
->
next
=
pNext
;
}
if
(
pNext
!=
NULL
)
{
pNext
->
prev
=
pNode
->
prev
;
}
uint32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
pHashObj
->
capacity
);
SHashEntry
*
pEntry
=
pHashObj
->
hashList
[
index
];
pEntry
->
num
--
;
pHashObj
->
size
--
;
pNode
->
next
=
NULL
;
...
...
@@ -325,8 +319,7 @@ void taosHashCleanup(SHashObj *pHashObj) {
if
(
pHashObj
->
hashList
)
{
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
pHashObj
->
hashList
[
i
];
pNode
=
pEntry
->
next
;
pNode
=
pHashObj
->
hashList
[
i
];
while
(
pNode
)
{
pNext
=
pNode
->
next
;
...
...
@@ -337,8 +330,6 @@ void taosHashCleanup(SHashObj *pHashObj) {
free
(
pNode
);
pNode
=
pNext
;
}
tfree
(
pEntry
);
}
free
(
pHashObj
->
hashList
);
...
...
@@ -385,13 +376,13 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
assert
(
pIter
->
pCur
==
NULL
&&
pIter
->
pNext
==
NULL
);
while
(
1
)
{
SHash
Entry
*
pEntry
=
pIter
->
pHashObj
->
hashList
[
pIter
->
entryIndex
];
if
(
pEntry
->
next
==
NULL
)
{
SHash
Node
*
pEntry
=
pIter
->
pHashObj
->
hashList
[
pIter
->
entryIndex
];
if
(
pEntry
==
NULL
)
{
pIter
->
entryIndex
++
;
continue
;
}
pIter
->
pCur
=
pEntry
->
next
;
pIter
->
pCur
=
pEntry
;
if
(
pIter
->
pCur
->
next
)
{
pIter
->
pNext
=
pIter
->
pCur
->
next
;
...
...
@@ -444,25 +435,25 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
int32_t
num
=
0
;
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
size
;
++
i
)
{
SHashEntry
*
pEntry
=
pHashObj
->
hashList
[
i
];
if
(
num
<
pEntry
->
num
)
{
num
=
pEntry
->
num
;
SHashNode
*
pEntry
=
pHashObj
->
hashList
[
i
];
if
(
pEntry
==
NULL
)
{
continue
;
}
int32_t
j
=
0
;
while
(
pEntry
!=
NULL
)
{
pEntry
=
pEntry
->
next
;
j
++
;
}
if
(
num
<
j
)
{
num
=
j
;
}
}
return
num
;
}
void
doUpdateHashTable
(
SHashObj
*
pHashObj
,
SHashNode
*
pNode
)
{
if
(
pNode
->
prev1
)
{
pNode
->
prev1
->
next
=
pNode
;
}
if
(
pNode
->
next
)
{
(
pNode
->
next
)
->
prev
=
pNode
;
}
}
void
taosHashTableResize
(
SHashObj
*
pHashObj
)
{
if
(
pHashObj
->
size
<
pHashObj
->
capacity
*
HASH_DEFAULT_LOAD_FACTOR
)
{
return
;
...
...
@@ -479,69 +470,53 @@ void taosHashTableResize(SHashObj *pHashObj) {
return
;
}
// int64_t st = taosGetTimestampUs();
SHashEntry
**
pNewEntry
=
realloc
(
pHashObj
->
hashList
,
sizeof
(
SHashEntry
*
)
*
newSize
);
if
(
pNewEntry
==
NULL
)
{
void
*
pNewEntry
=
realloc
(
pHashObj
->
hashList
,
POINTER_BYTES
*
newSize
);
if
(
pNewEntry
==
NULL
)
{
// todo handle error
// uTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return
;
}
pHashObj
->
hashList
=
pNewEntry
;
for
(
int32_t
i
=
pHashObj
->
capacity
;
i
<
newSize
;
++
i
)
{
pHashObj
->
hashList
[
i
]
=
calloc
(
1
,
sizeof
(
SHashEntry
));
}
memset
(
&
pHashObj
->
hashList
[
pHashObj
->
capacity
],
0
,
POINTER_BYTES
*
(
newSize
-
pHashObj
->
capacity
));
pHashObj
->
capacity
=
newSize
;
for
(
int32_t
i
=
0
;
i
<
pHashObj
->
capacity
;
++
i
)
{
SHashEntry
*
pEntry
=
pHashObj
->
hashList
[
i
];
pNode
=
pEntry
->
next
;
pNode
=
pHashObj
->
hashList
[
i
];
if
(
pNode
!=
NULL
)
{
assert
(
pNode
->
prev
1
==
pEntry
&&
pEntry
->
num
>
0
);
assert
(
pNode
->
prev
==
NULL
);
}
while
(
pNode
)
{
int32_t
j
=
HASH_INDEX
(
pNode
->
hashVal
,
pHashObj
->
capacity
);
if
(
j
==
i
)
{
// this key
resid
es in the same slot, no need to relocate it
if
(
j
==
i
)
{
// this key
locat
es in the same slot, no need to relocate it
pNode
=
pNode
->
next
;
}
else
{
pNext
=
pNode
->
next
;
// remove from current slot
assert
(
pNode
->
prev1
!=
NULL
);
if
(
pNode
->
prev1
==
pEntry
)
{
// first node of the overflow linked list
pEntry
->
next
=
pNode
->
next
;
if
(
pNode
->
prev
==
NULL
)
{
// first node of the overflow linked list
pHashObj
->
hashList
[
i
]
=
pNext
;
}
else
{
pNode
->
prev
->
next
=
pN
ode
->
n
ext
;
pNode
->
prev
->
next
=
pNext
;
}
pEntry
->
num
--
;
assert
(
pEntry
->
num
>=
0
);
if
(
pNode
->
next
!=
NULL
)
{
(
pNode
->
next
)
->
prev
=
pNode
->
prev
;
if
(
pNext
!=
NULL
)
{
pNext
->
prev
=
pNode
->
prev
;
}
//
added into new slot
//
clear pointer
pNode
->
next
=
NULL
;
pNode
->
prev1
=
NULL
;
SHashEntry
*
pNewIndexEntry
=
pHashObj
->
hashList
[
j
];
pNode
->
prev
=
NULL
;
if
(
pNewIndexEntry
->
next
!=
NULL
)
{
assert
(
pNewIndexEntry
->
next
->
prev1
==
pNewIndexEntry
);
pNewIndexEntry
->
next
->
prev
=
pNode
;
// added into new slot
SHashNode
*
pNew
=
pHashObj
->
hashList
[
j
];
if
(
pNew
!=
NULL
)
{
assert
(
pNew
->
prev
==
NULL
);
pNew
->
prev
=
pNode
;
}
pNode
->
next
=
pNewIndexEntry
->
next
;
pNode
->
prev1
=
pNewIndexEntry
;
pNewIndexEntry
->
next
=
pNode
;
pNewIndexEntry
->
num
++
;
pNode
->
next
=
pNew
;
pHashObj
->
hashList
[
j
]
=
pNode
;
// continue
pNode
=
pNext
;
...
...
@@ -549,7 +524,6 @@ void taosHashTableResize(SHashObj *pHashObj) {
}
}
// int64_t et = taosGetTimestampUs();
// uTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity,
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
}
...
...
@@ -595,19 +569,17 @@ SHashNode *doUpdateHashNode(SHashNode *pNode, const void *key, size_t keyLen, co
void
doAddToHashTable
(
SHashObj
*
pHashObj
,
SHashNode
*
pNode
)
{
assert
(
pNode
!=
NULL
);
int32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
pHashObj
->
capacity
);
SHashEntry
*
pEntry
=
pHashObj
->
hashList
[
index
];
pNode
->
next
=
pEntry
->
next
;
int32_t
index
=
HASH_INDEX
(
pNode
->
hashVal
,
pHashObj
->
capacity
);
if
(
pEntry
->
next
)
{
pEntry
->
next
->
prev
=
pNode
;
SHashNode
*
pEntry
=
pHashObj
->
hashList
[
index
];
if
(
pEntry
!=
NULL
)
{
pEntry
->
prev
=
pNode
;
pNode
->
next
=
pEntry
;
pNode
->
prev
=
NULL
;
}
pEntry
->
next
=
pNode
;
pNode
->
prev1
=
pEntry
;
pEntry
->
num
++
;
pHashObj
->
hashList
[
index
]
=
pNode
;
pHashObj
->
size
++
;
}
...
...
@@ -616,13 +588,13 @@ SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
pIter
->
entryIndex
++
;
while
(
pIter
->
entryIndex
<
pIter
->
pHashObj
->
capacity
)
{
SHash
Entry
*
pEntry
=
pIter
->
pHashObj
->
hashList
[
pIter
->
entryIndex
];
if
(
p
Entry
->
next
==
NULL
)
{
SHash
Node
*
pNode
=
pIter
->
pHashObj
->
hashList
[
pIter
->
entryIndex
];
if
(
p
Node
==
NULL
)
{
pIter
->
entryIndex
++
;
continue
;
}
return
p
Entry
->
next
;
return
p
Node
;
}
return
NULL
;
...
...
src/util/src/tcompare.c
浏览文件 @
c307a201
...
...
@@ -92,7 +92,7 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
if
(
len1
!=
len2
)
{
return
len1
>
len2
?
1
:-
1
;
}
else
{
int32_t
ret
=
wcsncmp
(
varDataVal
(
pLeft
),
varDataVal
(
pRight
),
len1
);
int32_t
ret
=
wcsncmp
(
varDataVal
(
pLeft
),
varDataVal
(
pRight
),
len1
/
TSDB_NCHAR_SIZE
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
...
...
src/util/tests/hashTest.cpp
浏览文件 @
c307a201
...
...
@@ -149,8 +149,8 @@ int main(int argc, char** argv) {
}
TEST
(
testCase
,
hashTest
)
{
//
simpleTest();
//
stringKeyTest();
//
noLockPerformanceTest();
//
multithreadsTest();
simpleTest
();
stringKeyTest
();
noLockPerformanceTest
();
multithreadsTest
();
}
\ No newline at end of file
src/vnode/src/vnodeWrite.c
浏览文件 @
c307a201
...
...
@@ -139,12 +139,10 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
char
*
pTagData
=
pTable
->
data
+
totalCols
*
sizeof
(
SSchema
);
int
accumBytes
=
0
;
//dataRow = tdNewDataRowFromSchema(pDestTagSchema);
dataRow
=
tdNewTagRowFromSchema
(
pDestTagSchema
,
numOfTags
);
for
(
int
i
=
0
;
i
<
numOfTags
;
i
++
)
{
STColumn
*
pTCol
=
schemaColAt
(
pDestTagSchema
,
i
);
// tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
tdAppendTagColVal
(
dataRow
,
pTagData
+
accumBytes
,
pTCol
->
type
,
pTCol
->
bytes
,
pTCol
->
colId
);
accumBytes
+=
htons
(
pSchema
[
i
+
numOfColumns
].
bytes
);
}
...
...
tests/examples/c/demo.c
浏览文件 @
c307a201
...
...
@@ -16,11 +16,12 @@
// TAOS standard API example. The same syntax as MySQL, but only a subet
// to compile: gcc -o demo demo.c -ltaos
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <taos.h> // TAOS header file
#include <unistd.h>
void
taosMsleep
(
int
mseconds
);
...
...
@@ -49,19 +50,52 @@ static int32_t doQuery(TAOS* taos, const char* sql) {
return
0
;
}
void
*
oneLoader
(
void
*
param
)
{
TAOS
*
conn
=
(
TAOS
*
)
param
;
for
(
int32_t
i
=
0
;
i
<
20000
;
++
i
)
{
// doQuery(conn, "show databases");
doQuery
(
conn
,
"use test"
);
// doQuery(conn, "describe t12");
// doQuery(conn, "show tables");
// doQuery(conn, "create table if not exists abc (ts timestamp, k int)");
// doQuery(conn, "select * from t12");
}
return
0
;
}
static
__attribute__
((
unused
))
void
multiThreadTest
(
int32_t
numOfThreads
,
void
*
conn
)
{
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
pthread_t
*
threadId
=
malloc
(
sizeof
(
pthread_t
)
*
numOfThreads
);
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pthread_create
(
&
threadId
[
i
],
NULL
,
oneLoader
,
conn
);
}
for
(
int32_t
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pthread_join
(
threadId
[
i
],
NULL
);
}
pthread_attr_destroy
(
&
thattr
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
TAOS
*
taos
;
char
qstr
[
1024
];
TAOS_RES
*
result
;
// connect to server
if
(
argc
<
2
)
{
printf
(
"please input server-ip
\n
"
);
return
0
;
}
taos_options
(
TSDB_OPTION_CONFIGDIR
,
"
~/sec
/cfg"
);
taos_options
(
TSDB_OPTION_CONFIGDIR
,
"
/home/lisa/Documents/workspace/TDinternal/community/sim/tsim
/cfg"
);
// init TAOS
taos_init
();
...
...
@@ -73,15 +107,12 @@ int main(int argc, char *argv[]) {
}
printf
(
"success to connect to server
\n
"
);
doQuery
(
taos
,
"create database if not exists test"
);
doQuery
(
taos
,
"use test"
);
doQuery
(
taos
,
"select count(*) from m1 where ts>='2020-1-1 1:1:1' and ts<='2020-1-1 1:1:59' interval(500a) fill(value, 99)"
);
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
// for(int32_t i = 0; i< 100000; ++i) {
// doQuery(taos, "select m1.ts,m1.a from m1, m2 where m1.ts=m2.ts and m1.a=m2.b;");
// usleep(500000);
// multiThreadTest(1, taos);
doQuery
(
taos
,
"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1, -2) group by t1 limit 2 offset 10;"
);
// for(int32_t i = 0; i < 100000; ++i) {
// doQuery(taos, "insert into t1 values(now, 2)");
// }
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 'abc')");
// doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
...
...
tests/script/general/import/commit.sim
浏览文件 @
c307a201
...
...
@@ -78,7 +78,8 @@ sleep 5000
print ========= step4
sql select * from ic2db.tb;
if $rows != 13 then
if $rows != 13 then
print expect 13, actual:$rows
return -1
endi
...
...
tests/script/general/parser/create_db.sim
浏览文件 @
c307a201
...
...
@@ -108,10 +108,10 @@ $cache = 16 # 16MB
$ablocks = 100
$tblocks = 32 # max=512, automatically trimmed when exceeding
$ctime = 36000 # 10 hours
$wal =
0 # valid value is 0,
1, 2
$wal =
1 # valid value is
1, 2
$comp = 1 # max=32, automatically trimmed when exceeding
sql create database $db replica $replica days $days keep $keep maxrows $rows_db cache $cache ctime $ctime wal $wal comp $comp
sql create database $db replica $replica days $days keep $keep maxrows $rows_db cache $cache
blocks 4
ctime $ctime wal $wal comp $comp
sql show databases
if $rows != 1 then
return -1
...
...
@@ -129,18 +129,15 @@ if $data06 != 365,365,365 then
return -1
endi
print data08 = $data08
if $data08 != $rows_db then
if $data08 != $cache then
print expect $cache, actual:$data08
return -1
endi
if $data09 !=
$cache then
if $data09 !=
4 then
return -1
endi
sql drop database $db
# ablocks_smaller_than_tblocks
#$ablocks = 50
#$tblocks = 100
#sql_error create database $db ablocks $ablocks tblocks $tblocks
sql drop database $db
## param range tests
# replica [1,3]
...
...
@@ -160,14 +157,11 @@ sql_error create database $db maxrows 199
#sql_error create database $db maxrows 10001
# cache [100, 10485760]
sql_error create database $db cache
99
sql_error create database $db cache
0
#sql_error create database $db cache 10485761
# ablocks [overwriten by 4*maxtablesPerVnode, 409600]
sql_error create database $db ablocks -1
#sql_error create database $db ablocks 409601
#
t
blocks [32, 4096 overwriten by 4096 if exceeds, Note added:2018-10-24]
# blocks [32, 4096 overwriten by 4096 if exceeds, Note added:2018-10-24]
#sql_error create database $db tblocks 31
#sql_error create database $db tblocks 4097
...
...
@@ -175,9 +169,10 @@ sql_error create database $db ablocks -1
sql_error create database $db ctime 29
sql_error create database $db ctime 40961
# wal {0, 1}
# wal {1, 2}
sql_error create database $db wal 0
sql_error create database $db wal -1
#sql_error create database $db wal 2
sql_error create database $db wal 3
# comp {0, 1, 2}
sql_error create database $db comp -1
...
...
tests/script/general/parser/interp_test.sim
浏览文件 @
c307a201
...
...
@@ -117,6 +117,7 @@ $tb = $tbPrefix . 0
return -1
endi
if $data01 != NULL then
print expect NULL, actual $data01
return -1
endi
if $data02 != NULL then
...
...
@@ -213,6 +214,7 @@ $tb = $tbPrefix . 0
return -1
endi
if $data03 != 0.00000 then
print expect 0.00000, actual:$data03
return -1
endi
# if $data04 != NULL then
...
...
tests/script/general/parser/limit1_stb.sim
浏览文件 @
c307a201
...
...
@@ -400,6 +400,7 @@ endi
$limit = $totalNum / 2
sql select max(c1), min(c2), avg(c3), count(c4), sum(c5), spread(c6), first(c7), last(c8), first(c9) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 group by t1 order by t1 asc limit $limit offset 0
if $rows != 6 then
print expect 6, actual:$rows
return -1
endi
if $data00 != 9 then
...
...
tests/script/general/parser/testSuite.sim
浏览文件 @
c307a201
...
...
@@ -8,38 +8,38 @@
#sleep 2000
#run general/parser/auto_create_tb_drop_tb.sim
sleep 2000
run general/parser/col_arithmetic_operation.sim
sleep 2000
run general/parser/columnValue.sim
sleep 2000
run general/parser/commit.sim
sleep 2000
run general/parser/create_db.sim
sleep 2000
run general/parser/create_mt.sim
sleep 2000
run general/parser/create_tb.sim
sleep 2000
run general/parser/dbtbnameValidate.sim
sleep 2000
run general/parser/import_commit1.sim
sleep 2000
run general/parser/import_commit2.sim
sleep 2000
run general/parser/import_commit3.sim
sleep 2000
run general/parser/insert_tb.sim
sleep 2000
run general/parser/first_last.sim
sleep 2000
run general/parser/import_file.sim
sleep 2000
run general/parser/lastrow.sim
sleep 2000
run general/parser/nchar.sim
sleep 2000
run general/parser/null_char.sim
#
sleep 2000
#
run general/parser/col_arithmetic_operation.sim
#
sleep 2000
#
run general/parser/columnValue.sim
#
sleep 2000
#
run general/parser/commit.sim
#
sleep 2000
#
run general/parser/create_db.sim
#
sleep 2000
#
run general/parser/create_mt.sim
#
sleep 2000
#
run general/parser/create_tb.sim
#
sleep 2000
#
run general/parser/dbtbnameValidate.sim
#
sleep 2000
#
run general/parser/import_commit1.sim
#
sleep 2000
#
run general/parser/import_commit2.sim
#
sleep 2000
#
run general/parser/import_commit3.sim
#
sleep 2000
#
run general/parser/insert_tb.sim
#
sleep 2000
#
run general/parser/first_last.sim
#
sleep 2000
##
run general/parser/import_file.sim
#
sleep 2000
#
run general/parser/lastrow.sim
#
sleep 2000
#
run general/parser/nchar.sim
#
sleep 2000
##
run general/parser/null_char.sim
sleep 2000
run general/parser/single_row_in_tb.sim
sleep 2000
...
...
@@ -62,20 +62,23 @@ sleep 2000
run general/parser/tbnameIn.sim
sleep 2000
run general/parser/projection_limit_offset.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
run general/parser/tags_dynamically_specifiy.sim
run general/parser/where.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/interp.sim
sleep 2000
run general/parser/tags_dynamically_specifiy.sim
sleep 2000
run general/parser/set_tag_vals.sim
...
...
@@ -86,8 +89,6 @@ run general/parser/stream_on_sys.sim
sleep 2000
run general/parser/stream.sim
sleep 2000
run general/parser/where.sim
sleep 2000
#run general/parser/repeatAlter.sim
sleep 2000
...
...
@@ -97,11 +98,8 @@ run general/parser/join.sim
sleep 2000
run general/parser/join_multivnode.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/groupby.sim
sleep 2000
run general/parser/binary_escapeCharacter.sim
sleep 2000
...
...
tests/script/general/parser/where.sim
浏览文件 @
c307a201
...
...
@@ -29,23 +29,23 @@ $i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$ms = $x . m
$ms = $x . m
$c = $x / 100
$c = $c * 100
$c = $x - $c
$binary = 'binary . $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = 'nchar . $c
$nchar = $nchar . '
sql insert into $tb values (now + $ms , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
sql insert into $tb values (now + $ms , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
$x = $x + 1
endw
endw
$i = $i + 1
endw
endw
sleep 100
...
...
@@ -78,12 +78,16 @@ sql select tbname from $mt where t1 < 2
if $rows != 2 then
return -1
endi
print $tbPrefix
$tb = $tbPrefix . 0
if $data00 != $tb then
if $data00 != wh_tb1 then
print expect wh_tb1, actual:$data00
return -1
endi
$tb = $tbPrefix . 1
if $data10 != $tb then
if $data10 != wh_tb0 then
print expect wh_tb0, actual:$data00
return -1
endi
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录