Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
dfc7c836
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dfc7c836
编写于
10月 29, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/update
上级
f879d6b0
6bbbd153
变更
13
展开全部
隐藏空白更改
内联
并排
Showing
13 changed file
with
1415 addition
and
56 deletion
+1415
-56
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+5
-8
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+36
-35
src/query/src/qUtil.c
src/query/src/qUtil.c
+5
-4
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+6
-1
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+0
-1
src/util/src/tref.c
src/util/src/tref.c
+7
-6
tests/examples/JDBC/JDBCDemo/README-jdbc-windows.md
tests/examples/JDBC/JDBCDemo/README-jdbc-windows.md
+268
-0
tests/pytest/fulltest.sh
tests/pytest/fulltest.sh
+1
-0
tests/pytest/query/queryCountCSVData.py
tests/pytest/query/queryCountCSVData.py
+71
-0
tests/pytest/test_data/__init__.py
tests/pytest/test_data/__init__.py
+15
-0
tests/pytest/test_data/disordered.csv
tests/pytest/test_data/disordered.csv
+500
-0
tests/pytest/test_data/ordered.csv
tests/pytest/test_data/ordered.csv
+500
-0
未找到文件。
src/query/inc/qExecutor.h
浏览文件 @
dfc7c836
...
...
@@ -33,15 +33,11 @@ struct SColumnFilterElem;
typedef
bool
(
*
__filter_func_t
)(
struct
SColumnFilterElem
*
pFilter
,
char
*
val1
,
char
*
val2
);
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
typedef
struct
SPosInfo
{
int32_t
pageId
:
20
;
int32_t
rowId
:
12
;
}
SPosInfo
;
typedef
struct
SGroupResInfo
{
int32_t
groupId
;
int32_t
numOfDataPages
;
SPosInfo
pos
;
int32_t
pageId
;
int32_t
rowId
;
}
SGroupResInfo
;
typedef
struct
SSqlGroupbyExpr
{
...
...
@@ -53,9 +49,10 @@ typedef struct SSqlGroupbyExpr {
}
SSqlGroupbyExpr
;
typedef
struct
SWindowResult
{
SPosInfo
pos
;
// Position of current result in disk-based output buffer
int32_t
pageId
;
// pageId & rowId is the position of current result in disk-based output buffer
int32_t
rowId
:
15
;
bool
closed
:
1
;
// this result status: closed or opened
uint16_t
numOfRows
;
// number of rows of current time window
bool
closed
;
// this result status: closed or opened
SResultInfo
*
resultInfo
;
// For each result column, there is a resultInfo
union
{
STimeWindow
win
;
char
*
key
;};
// start key of current time window
}
SWindowResult
;
...
...
src/query/inc/qUtil.h
浏览文件 @
dfc7c836
...
...
@@ -51,7 +51,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
realRowId
=
(
int32_t
)(
pResult
->
pos
.
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
));
int32_t
realRowId
=
(
int32_t
)(
pResult
->
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
));
return
((
char
*
)
page
->
data
)
+
pRuntimeEnv
->
offset
[
columnIndex
]
*
pRuntimeEnv
->
numOfRowsPerPage
+
pQuery
->
pSelectExpr
[
columnIndex
].
bytes
*
realRowId
;
}
...
...
src/query/src/qExecutor.c
浏览文件 @
dfc7c836
...
...
@@ -557,7 +557,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
static
int32_t
addNewWindowResultBuf
(
SWindowResult
*
pWindowRes
,
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
sid
,
int32_t
numOfRowsPerPage
)
{
if
(
pWindowRes
->
p
os
.
p
ageId
!=
-
1
)
{
if
(
pWindowRes
->
pageId
!=
-
1
)
{
return
0
;
}
...
...
@@ -590,11 +590,11 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
}
// set the number of rows in current disk page
if
(
pWindowRes
->
p
os
.
p
ageId
==
-
1
)
{
// not allocated yet, allocate new buffer
pWindowRes
->
p
os
.
p
ageId
=
pageId
;
pWindowRes
->
pos
.
rowId
=
(
int32_t
)(
pData
->
num
++
);
if
(
pWindowRes
->
pageId
==
-
1
)
{
// not allocated yet, allocate new buffer
pWindowRes
->
pageId
=
pageId
;
pWindowRes
->
rowId
=
(
int32_t
)(
pData
->
num
++
);
assert
(
pWindowRes
->
p
os
.
p
ageId
>=
0
);
assert
(
pWindowRes
->
pageId
>=
0
);
}
return
0
;
...
...
@@ -616,7 +616,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
*
newWind
=
true
;
// not assign result buffer yet, add new result buffer
if
(
pWindowRes
->
p
os
.
p
ageId
==
-
1
)
{
if
(
pWindowRes
->
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
sid
,
pRuntimeEnv
->
numOfRowsPerPage
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
...
...
@@ -1143,7 +1143,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
assert
(
pRuntimeEnv
->
windowResInfo
.
interval
==
0
);
if
(
pWindowRes
->
p
os
.
p
ageId
==
-
1
)
{
if
(
pWindowRes
->
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
GROUPRESULTID
,
pRuntimeEnv
->
numOfRowsPerPage
);
if
(
ret
!=
0
)
{
return
-
1
;
...
...
@@ -2652,7 +2652,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
...
...
@@ -2823,14 +2823,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
SWindowResInfo
*
pWindowResInfo1
=
&
supporter
->
pTableQueryInfo
[
left
]
->
windowResInfo
;
SWindowResult
*
pWindowRes1
=
getWindowResult
(
pWindowResInfo1
,
leftPos
);
tFilePage
*
page1
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes1
->
p
os
.
p
ageId
);
tFilePage
*
page1
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes1
->
pageId
);
char
*
b1
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes1
,
page1
);
TSKEY
leftTimestamp
=
GET_INT64_VAL
(
b1
);
SWindowResInfo
*
pWindowResInfo2
=
&
supporter
->
pTableQueryInfo
[
right
]
->
windowResInfo
;
SWindowResult
*
pWindowRes2
=
getWindowResult
(
pWindowResInfo2
,
rightPos
);
tFilePage
*
page2
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes2
->
p
os
.
p
ageId
);
tFilePage
*
page2
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes2
->
pageId
);
char
*
b2
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes2
,
page2
);
TSKEY
rightTimestamp
=
GET_INT64_VAL
(
b2
);
...
...
@@ -2867,7 +2867,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
}
SGroupResInfo
*
info
=
&
pQInfo
->
groupResInfo
;
if
(
pQInfo
->
groupIndex
==
numOfGroups
&&
info
->
p
os
.
p
ageId
==
info
->
numOfDataPages
)
{
if
(
pQInfo
->
groupIndex
==
numOfGroups
&&
info
->
pageId
==
info
->
numOfDataPages
)
{
SET_STABLE_QUERY_OVER
(
pQInfo
);
}
...
...
@@ -2883,10 +2883,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SGroupResInfo
*
pGroupResInfo
=
&
pQInfo
->
groupResInfo
;
// all results have been return to client, try next group
if
(
pGroupResInfo
->
p
os
.
p
ageId
==
pGroupResInfo
->
numOfDataPages
)
{
if
(
pGroupResInfo
->
pageId
==
pGroupResInfo
->
numOfDataPages
)
{
pGroupResInfo
->
numOfDataPages
=
0
;
pGroupResInfo
->
p
os
.
p
ageId
=
0
;
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
pageId
=
0
;
pGroupResInfo
->
rowId
=
0
;
// current results of group has been sent to client, try next group
if
(
mergeIntoGroupResult
(
pQInfo
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2914,22 +2914,22 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
assert
(
size
==
pGroupResInfo
->
numOfDataPages
);
bool
done
=
false
;
for
(
int32_t
j
=
pGroupResInfo
->
p
os
.
p
ageId
;
j
<
size
;
++
j
)
{
for
(
int32_t
j
=
pGroupResInfo
->
pageId
;
j
<
size
;
++
j
)
{
SPageInfo
*
pi
=
*
(
SPageInfo
**
)
taosArrayGet
(
list
,
j
);
tFilePage
*
pData
=
getResBufPage
(
pResultBuf
,
pi
->
pageId
);
assert
(
pData
->
num
>
0
&&
pData
->
num
<=
pRuntimeEnv
->
numOfRowsPerPage
&&
pGroupResInfo
->
pos
.
rowId
<
pData
->
num
);
int32_t
numOfRes
=
(
int32_t
)(
pData
->
num
-
pGroupResInfo
->
pos
.
rowId
);
assert
(
pData
->
num
>
0
&&
pData
->
num
<=
pRuntimeEnv
->
numOfRowsPerPage
&&
pGroupResInfo
->
rowId
<
pData
->
num
);
int32_t
numOfRes
=
(
int32_t
)(
pData
->
num
-
pGroupResInfo
->
rowId
);
if
(
numOfRes
>
pQuery
->
rec
.
capacity
-
offset
)
{
numOfCopiedRows
=
(
int32_t
)(
pQuery
->
rec
.
capacity
-
offset
);
pGroupResInfo
->
pos
.
rowId
+=
numOfCopiedRows
;
pGroupResInfo
->
rowId
+=
numOfCopiedRows
;
done
=
true
;
}
else
{
numOfCopiedRows
=
(
int32_t
)
pData
->
num
;
pGroupResInfo
->
p
os
.
p
ageId
+=
1
;
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
pageId
+=
1
;
pGroupResInfo
->
rowId
=
0
;
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
...
...
@@ -3020,8 +3020,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
pGroupResInfo
->
numOfDataPages
=
(
int32_t
)
taosArrayGetSize
(
pageList
);
pGroupResInfo
->
groupId
=
tid
;
pGroupResInfo
->
p
os
.
p
ageId
=
0
;
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
pageId
=
0
;
pGroupResInfo
->
rowId
=
0
;
return
pGroupResInfo
->
numOfDataPages
;
}
...
...
@@ -3067,7 +3067,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SWindowResInfo
*
pWindowResInfo
=
&
pTableList
[
pos
]
->
windowResInfo
;
SWindowResult
*
pWindowRes
=
getWindowResult
(
pWindowResInfo
,
cs
.
position
[
pos
]);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pageId
);
char
*
b
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes
,
page
);
TSKEY
ts
=
GET_INT64_VAL
(
b
);
...
...
@@ -3104,7 +3104,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
lastTimestamp
=
ts
;
// move to the next element of current entry
int32_t
currentPageId
=
pWindowRes
->
p
os
.
p
ageId
;
int32_t
currentPageId
=
pWindowRes
->
pageId
;
cs
.
position
[
pos
]
+=
1
;
if
(
cs
.
position
[
pos
]
>=
pWindowResInfo
->
size
)
{
...
...
@@ -3117,7 +3117,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
}
else
{
// current page is not needed anymore
SWindowResult
*
pNextWindowRes
=
getWindowResult
(
pWindowResInfo
,
cs
.
position
[
pos
]);
if
(
pNextWindowRes
->
p
os
.
p
ageId
!=
currentPageId
)
{
if
(
pNextWindowRes
->
pageId
!=
currentPageId
)
{
releaseResBufPage
(
pRuntimeEnv
->
pResultBuf
,
page
);
}
}
...
...
@@ -3329,7 +3329,8 @@ int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool is
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
pResultRow
->
pos
=
(
SPosInfo
)
{
-
1
,
-
1
};
pResultRow
->
pageId
=
-
1
;
pResultRow
->
rowId
=
-
1
;
char
*
buf
=
(
char
*
)
pResultRow
->
resultInfo
+
numOfCols
*
sizeof
(
SResultInfo
);
...
...
@@ -3796,7 +3797,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if
(
pWindowRes
->
p
os
.
p
ageId
==
-
1
)
{
if
(
pWindowRes
->
pageId
==
-
1
)
{
if
(
addNewWindowResultBuf
(
pWindowRes
,
pRuntimeEnv
->
pResultBuf
,
groupIndex
,
pRuntimeEnv
->
numOfRowsPerPage
)
!=
TSDB_CODE_SUCCESS
)
{
return
;
...
...
@@ -3813,7 +3814,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
...
...
@@ -3840,7 +3841,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage
*
bufPage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
p
os
.
p
ageId
);
tFilePage
*
bufPage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
...
...
@@ -4019,12 +4020,12 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
for
(
int32_t
i
=
startIdx
;
(
i
<
totalSet
)
&&
(
i
>=
0
);
i
+=
step
)
{
if
(
result
[
i
].
numOfRows
==
0
)
{
pQInfo
->
groupIndex
+=
1
;
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
rowId
=
0
;
continue
;
}
int32_t
numOfRowsToCopy
=
result
[
i
].
numOfRows
-
pGroupResInfo
->
pos
.
rowId
;
int32_t
oldOffset
=
pGroupResInfo
->
pos
.
rowId
;
int32_t
numOfRowsToCopy
=
result
[
i
].
numOfRows
-
pGroupResInfo
->
rowId
;
int32_t
oldOffset
=
pGroupResInfo
->
rowId
;
/*
* current output space is not enough to accommodate all data of this page, only partial results
...
...
@@ -4032,13 +4033,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
*/
if
(
numOfRowsToCopy
>
pQuery
->
rec
.
capacity
-
numOfResult
)
{
numOfRowsToCopy
=
(
int32_t
)
pQuery
->
rec
.
capacity
-
numOfResult
;
pGroupResInfo
->
pos
.
rowId
+=
numOfRowsToCopy
;
pGroupResInfo
->
rowId
+=
numOfRowsToCopy
;
}
else
{
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
rowId
=
0
;
pQInfo
->
groupIndex
+=
1
;
}
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
result
[
i
].
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
result
[
i
].
pageId
);
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
size
=
pRuntimeEnv
->
pCtx
[
j
].
outputBytes
;
...
...
src/query/src/qUtil.c
浏览文件 @
dfc7c836
...
...
@@ -266,7 +266,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
return
;
}
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
pQuery
->
numOfOutput
;
++
i
)
{
SResultInfo
*
pResultInfo
=
&
pWindowRes
->
resultInfo
[
i
];
...
...
@@ -279,7 +279,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
}
pWindowRes
->
numOfRows
=
0
;
pWindowRes
->
pos
=
(
SPosInfo
){
-
1
,
-
1
};
pWindowRes
->
pageId
=
-
1
;
pWindowRes
->
rowId
=
-
1
;
pWindowRes
->
closed
=
false
;
pWindowRes
->
win
=
TSWINDOW_INITIALIZER
;
}
...
...
@@ -308,10 +309,10 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
memcpy
(
pDst
->
interResultBuf
,
pSrc
->
interResultBuf
,
pDst
->
bufLen
);
// copy the output buffer data from src to dst, the position info keep unchanged
tFilePage
*
dstpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
dst
->
p
os
.
p
ageId
);
tFilePage
*
dstpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
dst
->
pageId
);
char
*
dstBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
dst
,
dstpage
);
tFilePage
*
srcpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
src
->
p
os
.
p
ageId
);
tFilePage
*
srcpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
src
->
pageId
);
char
*
srcBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
(
SWindowResult
*
)
src
,
srcpage
);
size_t
s
=
pRuntimeEnv
->
pQuery
->
pSelectExpr
[
i
].
bytes
;
...
...
src/rpc/src/rpcMain.c
浏览文件 @
dfc7c836
...
...
@@ -215,6 +215,11 @@ static void rpcUnlockConn(SRpcConn *pConn);
static
void
rpcAddRef
(
SRpcInfo
*
pRpc
);
static
void
rpcDecRef
(
SRpcInfo
*
pRpc
);
static
void
rpcFree
(
void
*
p
)
{
tTrace
(
"free mem: %p"
,
p
);
free
(
p
);
}
static
void
rpcInit
(
void
)
{
tsProgressTimer
=
tsRpcTimer
/
2
;
...
...
@@ -222,7 +227,7 @@ static void rpcInit(void) {
tsRpcHeadSize
=
RPC_MSG_OVERHEAD
;
tsRpcOverhead
=
sizeof
(
SRpcReqContext
);
tsRpcRefId
=
taosOpenRef
(
200
,
f
ree
);
tsRpcRefId
=
taosOpenRef
(
200
,
rpcF
ree
);
}
void
*
rpcOpen
(
const
SRpcInit
*
pInit
)
{
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
dfc7c836
...
...
@@ -1271,7 +1271,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
int32_t
end
=
endPos
;
if
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
assert
(
start
>=
end
);
SWAP
(
start
,
end
,
int32_t
);
}
...
...
src/util/src/tref.c
浏览文件 @
dfc7c836
...
...
@@ -159,8 +159,8 @@ int taosAddRef(int refId, void *p)
taosLockList
(
pSet
->
lockedBy
+
hash
);
pNode
=
pSet
->
nodeList
[
hash
];
while
(
pNode
)
{
if
(
pNode
->
p
==
p
)
while
(
pNode
)
{
if
(
pNode
->
p
==
p
)
break
;
pNode
=
pNode
->
next
;
...
...
@@ -176,8 +176,9 @@ int taosAddRef(int refId, void *p)
pNode
->
count
=
1
;
pNode
->
prev
=
0
;
pNode
->
next
=
pSet
->
nodeList
[
hash
];
if
(
pSet
->
nodeList
[
hash
])
pSet
->
nodeList
[
hash
]
->
prev
=
pNode
;
pSet
->
nodeList
[
hash
]
=
pNode
;
uTrace
(
"refId:%d p:%p is added, count:
:%d"
,
refId
,
p
,
pSet
->
count
);
uTrace
(
"refId:%d p:%p is added, count:
%d malloc mem: %p"
,
refId
,
p
,
pSet
->
count
,
pNode
);
}
else
{
code
=
TSDB_CODE_REF_NO_MEMORY
;
uTrace
(
"refId:%d p:%p is not added, since no memory"
,
refId
,
p
);
...
...
@@ -197,7 +198,7 @@ int taosAcquireRef(int refId, void *p)
SRefNode
*
pNode
;
SRefSet
*
pSet
;
if
(
refId
<
0
||
refId
>=
TSDB_REF_OBJECTS
)
{
if
(
refId
<
0
||
refId
>=
TSDB_REF_OBJECTS
)
{
uTrace
(
"refId:%d p:%p failed to acquire, refId not valid"
,
refId
,
p
);
return
TSDB_CODE_REF_INVALID_ID
;
}
...
...
@@ -267,7 +268,7 @@ void taosReleaseRef(int refId, void *p)
pNode
=
pSet
->
nodeList
[
hash
];
while
(
pNode
)
{
if
(
pNode
->
p
==
p
)
if
(
pNode
->
p
==
p
)
break
;
pNode
=
pNode
->
next
;
...
...
@@ -291,7 +292,7 @@ void taosReleaseRef(int refId, void *p)
free
(
pNode
);
released
=
1
;
uTrace
(
"refId:%d p:%p is removed, count:
:%d"
,
refId
,
p
,
pSet
->
count
);
uTrace
(
"refId:%d p:%p is removed, count:
%d, free mem: %p"
,
refId
,
p
,
pSet
->
count
,
pNode
);
}
else
{
uTrace
(
"refId:%d p:%p is released"
,
refId
,
p
);
}
...
...
tests/examples/JDBC/JDBCDemo/README-jdbc-windows.md
0 → 100644
浏览文件 @
dfc7c836
# 如何在 windows环境下使用jdbc进行TDengine应用开发
本文以windows环境为例,介绍java如何进行TDengine开发应用
## 环境准备
(1)安装jdk
官网下载jdk-1.8,下载页面:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
安装,配置环境变量,把jdk加入到环境变量里。
命令行内查看java的版本。
```
shell
>
java
-version
java version
"1.8.0_131"
Java
(
TM
)
SE Runtime Environment
(
build 1.8.0_131-b11
)
Java HotSpot
(
TM
)
64-Bit Server VM
(
build 25.131-b11, mixed mode
)
```
(2)安装配置maven
官网下载maven,下载地址:http://maven.apache.org/download.cgi
配置环境变量MAVEN_HOME,将MAVEN_HOME/bin添加到PATH
命令行里查看maven的版本
```
shell
>
mvn
--version
Apache Maven 3.5.0
(
ff8f5e7444045639af65f6095c62210b5713f426
;
2017-04-04T03:39:06+08:00
)
Maven home: D:
\a
pache-maven-3.5.0
\b
in
\.
.
Java version: 1.8.0_131, vendor: Oracle Corporation
Java home: C:
\P
rogram Files
\J
ava
\j
dk1.8.0_131
\j
re
Default locale: zh_CN, platform encoding: GBK
OS name:
"windows 10"
, version:
"10.0"
,
arch
:
"amd64"
, family:
"windows"
```
为了加快maven下载依赖的速度,可以为maven配置mirror,修改MAVEN_HOME
\c
onfig
\s
ettings.xml文件
```
xml
<settings
xmlns=
"http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd"
>
<!-- 配置本地maven仓库的路径 -->
<localRepository>
D:\apache-maven-localRepository
</localRepository>
<mirrors>
<!-- 配置阿里云Maven镜像仓库 -->
<mirror>
<id>
alimaven
</id>
<name>
aliyun maven
</name>
<url>
http://maven.aliyun.com/nexus/content/groups/public/
</url>
<mirrorOf>
central
</mirrorOf>
</mirror>
</mirrors>
<profiles>
<!-- 配置jdk,maven会默认使用java1.8 -->
<profile>
<id>
jdk-1.8
</id>
<activation>
<activeByDefault>
true
</activeByDefault>
<jdk>
1.8
</jdk>
</activation>
<properties>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
<maven.compiler.compilerVersion>
1.8
</maven.compiler.compilerVersion>
</properties>
</profile>
</profiles>
</settings>
```
(3)在linux服务器上安装TDengine-server
在taosdata官网下载TDengine-server,下载地址:https://www.taosdata.com/cn/all-downloads/
在linux服务器上安装TDengine-server
```
shell
# tar -zxvf package/TDengine-server-2.0.1.1-Linux-x64.tar.gz
# cd TDengine-server/
# ./install.sh
```
启动taosd
```
shell
# systemctl start taosd
```
在server上用taos连接taosd
```
shell
# taos
taos> show dnodes
;
id
| end_point | vnodes | cores | status | role | create_time |
==================================================================================================================
1 | td01:6030 | 2 | 4 | ready | any | 2020-08-19 18:40:25.045 |
Query OK, 1 row
(
s
)
in
set
(
0.005765s
)
```
如果可以正确连接到taosd实例,并打印出databases的信息,说明TDengine的server已经正确启动。这里查看server的hostname
```
shell
# hostname -f
td01
```
注意,如果安装TDengine后,使用默认的taos.cfg配置文件,taosd会使用当前server的hostname创建dnode实例。之后,在client也需要使用这个hostname来连接taosd。
(4)在windows上安装TDengine-client
在taosdata官网下载taos客户端,下载地址:
https://www.taosdata.com/cn/all-downloads/
下载后,双击exe安装。
修改client的hosts文件(C:
\W
indows
\S
ystem32
\d
rivers
\e
tc
\h
osts),将server的hostname和ip配置到client的hosts文件中
```
192.168.236.136 td01
```
配置完成后,在命令行内使用taos shell连接server端
```
shell
C:
\T
Dengine>taos
Welcome to the TDengine shell from Linux, Client Version:2.0.1.1
Copyright
(
c
)
2017 by TAOS Data, Inc. All rights reserved.
taos> show databases
;
name | created_time | ntables | vgroups | replica | quorum | days | keep1,keep2,keep
(
D
)
| cache
(
MB
)
| blocks | minrows | maxrows | wallevel | fsync | comp | precision | status |
===================================================================================================================================================================================================================================================================
test
| 2020-08-19 18:43:50.731 | 1 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready |
log | 2020-08-19 18:40:28.064 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1 | 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready |
Query OK, 2 row
(
s
)
in
set
(
0.068000s
)
```
如果windows上的client能够正常连接,并打印database信息,说明client可以正常连接server了。
## 应用开发
(1)新建maven工程,在pom.xml中引入taos-jdbcdriver依赖。
```
xml
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<groupId>
com.taosdata.demo
</groupId>
<artifactId>
JdbcDemo
</artifactId>
<version>
1.0-SNAPSHOT
</version>
<dependencies>
<dependency>
<groupId>
com.taosdata.jdbc
</groupId>
<artifactId>
taos-jdbcdriver
</artifactId>
<version>
2.0.8
</version>
</dependency>
</dependencies>
</project>
```
(2)使用jdbc查询TDengine数据库
下面是示例代码:
```
java
public
class
JdbcDemo
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Connection
conn
=
getConn
();
Statement
stmt
=
conn
.
createStatement
();
// create database
stmt
.
executeUpdate
(
"create database if not exists db"
);
// use database
stmt
.
executeUpdate
(
"use db"
);
// create table
stmt
.
executeUpdate
(
"create table if not exists tb (ts timestamp, temperature int, humidity float)"
);
// insert data
int
affectedRows
=
stmt
.
executeUpdate
(
"insert into tb values(now, 23, 10.3) (now + 1s, 20, 9.3)"
);
System
.
out
.
println
(
"insert "
+
affectedRows
+
" rows."
);
// query data
ResultSet
resultSet
=
stmt
.
executeQuery
(
"select * from tb"
);
Timestamp
ts
=
null
;
int
temperature
=
0
;
float
humidity
=
0
;
while
(
resultSet
.
next
()){
ts
=
resultSet
.
getTimestamp
(
1
);
temperature
=
resultSet
.
getInt
(
2
);
humidity
=
resultSet
.
getFloat
(
"humidity"
);
System
.
out
.
printf
(
"%s, %d, %s\n"
,
ts
,
temperature
,
humidity
);
}
}
public
static
Connection
getConn
()
throws
Exception
{
Class
.
forName
(
"com.taosdata.jdbc.TSDBDriver"
);
String
jdbcUrl
=
"jdbc:TAOS://td01:0/log?user=root&password=taosdata"
;
Properties
connProps
=
new
Properties
();
connProps
.
setProperty
(
TSDBDriver
.
PROPERTY_KEY_CHARSET
,
"UTF-8"
);
connProps
.
setProperty
(
TSDBDriver
.
PROPERTY_KEY_LOCALE
,
"en_US.UTF-8"
);
connProps
.
setProperty
(
TSDBDriver
.
PROPERTY_KEY_TIME_ZONE
,
"UTC-8"
);
Connection
conn
=
DriverManager
.
getConnection
(
jdbcUrl
,
connProps
);
return
conn
;
}
}
```
(3)测试jdbc访问tdengine的sever实例
console输出:
```
insert 2 rows.
2020-08-26 00:06:34.575, 23, 10.3
2020-08-26 00:06:35.575, 20, 9.3
```
## 指南
(1)如何设置主机名和hosts
在server上查看hostname和fqdn
```
shell
查看hostname
# hostname
taos-server
查看fqdn
# hostname -f
taos-server
```
windows下hosts文件位于:
C:
\\
Windows
\S
ystem32
\d
rivers
\e
tc
\h
osts
修改hosts文件,添加server的ip和hostname
```
s
192.168.56.101
node5
```
(2)什么是fqdn?
> 什么是FQDN?
>
> FQDN(Full qualified domain name)全限定域名,fqdn由2部分组成:hostname+domainname。
>
> 例如,一个邮件服务器的fqdn可能是:mymail.somecollege.edu,其中mymail是hostname(主机名),somcollege.edu是domainname(域名)。本例中,.edu是顶级域名,.somecollege是二级域名。
>
> 当连接服务器时,必须指定fqdn,然后,dns服务器通过查看dns表,将hostname解析为相应的ip地址。如果只指定hostname(不指定domainname),应用程序可能服务解析主机名。因为如果你试图访问不在本地的远程服务器时,本地的dns服务器和可能没有远程服务器的hostname列表。
>
> 参考:https://kb.iu.edu/d/aiuv
tests/pytest/fulltest.sh
浏览文件 @
dfc7c836
...
...
@@ -151,6 +151,7 @@ python3 ./test.py -f query/select_last_crash.py
python3 ./test.py
-f
query/queryNullValueTest.py
python3 ./test.py
-f
query/queryInsertValue.py
python3 ./test.py
-f
query/queryConnection.py
python3 ./test.py
-f
query/queryCountCSVData.py
python3 ./test.py
-f
query/natualInterval.py
python3 ./test.py
-f
query/bug1471.py
...
...
tests/pytest/query/queryCountCSVData.py
0 → 100644
浏览文件 @
dfc7c836
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
taos
from
util.log
import
tdLog
from
util.cases
import
tdCases
from
util.sql
import
tdSql
from
util.dnodes
import
tdDnodes
class
TDTestCase
:
"""
create table and insert data from disordered.csv which timestamp is disordered and
ordered.csv which timestamp is ordered.
then execute 'select count(*) from table xx;'
"""
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
def
run
(
self
):
tdSql
.
prepare
()
print
(
"==============step1"
)
tdSql
.
execute
(
"create database if not exists demo;"
);
tdSql
.
execute
(
"use demo;"
)
tdSql
.
execute
(
"CREATE TABLE IF NOT EXISTS test1 (ts TIMESTAMP, ValueID int, "
"VariantValue float, Quality int, Flags int);"
)
tdSql
.
execute
(
"CREATE TABLE IF NOT EXISTS test2 (ts TIMESTAMP, ValueID int, "
"VariantValue float, Quality int, Flags int);"
)
ordered_csv
=
__file__
.
split
(
'query'
)[
0
]
+
'test_data/ordered.csv'
disordered_csv
=
__file__
.
split
(
'query'
)[
0
]
+
'test_data/disordered.csv'
tdSql
.
execute
(
" insert into test1 file '{file}';"
.
format
(
file
=
ordered_csv
))
tdSql
.
execute
(
" insert into test2 file '{file}';"
.
format
(
file
=
disordered_csv
))
print
(
"==============insert into test1 and test2 form test file"
)
print
(
"==============step2"
)
tdSql
.
query
(
'select * from test1;'
)
with
open
(
ordered_csv
)
as
f1
:
num1
=
len
(
f1
.
readlines
())
tdSql
.
checkRows
(
num1
)
tdSql
.
query
(
'select * from test2;'
)
with
open
(
disordered_csv
)
as
f2
:
num2
=
len
(
f2
.
readlines
())
tdSql
.
checkRows
(
num2
)
print
(
"=============execute select count(*) from xxx"
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/pytest/test_data/__init__.py
0 → 100644
浏览文件 @
dfc7c836
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
"""
this directory contains test data files
"""
\ No newline at end of file
tests/pytest/test_data/disordered.csv
0 → 100644
浏览文件 @
dfc7c836
此差异已折叠。
点击以展开。
tests/pytest/test_data/ordered.csv
0 → 100644
浏览文件 @
dfc7c836
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录