Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8ce2e124
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8ce2e124
编写于
11月 04, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
11月 04, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17886 from taosdata/feature/3_liaohj
fix(query): set the last key of each tablescan info
上级
19775920
9c33d10a
变更
6
展开全部
隐藏空白更改
内联
并排
Showing
6 changed file
with
357 addition
and
337 deletion
+357
-337
include/common/tcommon.h
include/common/tcommon.h
+3
-3
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+8
-5
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+26
-21
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+6
-3
tests/system-test/1-insert/delete_data.py
tests/system-test/1-insert/delete_data.py
+312
-303
未找到文件。
include/common/tcommon.h
浏览文件 @
8ce2e124
...
...
@@ -225,13 +225,13 @@ typedef struct SVarColAttr {
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef
struct
SColumnInfoData
{
SColumnInfo
info
;
// column info
bool
hasNull
;
// if current column data has null value.
char
*
pData
;
// the corresponding block data in memory
char
*
pData
;
// the corresponding block data in memory
union
{
char
*
nullbitmap
;
// bitmap, one bit for each item in the list
SVarColAttr
varmeta
;
};
SColumnInfo
info
;
// column info
bool
hasNull
;
// if current column data has null value.
}
SColumnInfoData
;
typedef
struct
SQueryTableDataCond
{
...
...
source/client/test/clientTests.cpp
浏览文件 @
8ce2e124
...
...
@@ -112,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
}
taos_free_result
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
1000
0
;
i
+=
20
)
{
for
(
int32_t
i
=
0
;
i
<
2
0
;
i
+=
20
)
{
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
...
@@ -692,6 +692,7 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST
(
testCase
,
projection_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -703,7 +704,7 @@ TEST(testCase, projection_query_tables) {
// }
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use
benchmarkcpu
");
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use
abc2
"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable st1 (ts timestamp, k int) tags(a int)"
);
...
...
@@ -725,7 +726,7 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result
(
pRes
);
for (int32_t i = 0; i < 2; ++i) {
for
(
int32_t
i
=
0
;
i
<
2
00000
;
++
i
)
{
printf
(
"create table :%d
\n
"
,
i
);
createNewTable
(
pConn
,
i
);
}
...
...
@@ -750,7 +751,9 @@ TEST(testCase, projection_query_tables) {
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
#endif
#if 0
TEST(testCase, tsbs_perf_test) {
TdThread qid[20] = {0};
...
...
@@ -761,7 +764,7 @@ TEST(testCase, tsbs_perf_test) {
getchar();
}
#if 0
TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
8ce2e124
...
...
@@ -185,11 +185,11 @@ static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STabl
SRowMerger
*
pMerger
,
SVersionRange
*
pVerRange
);
static
int32_t
doMergeRowsInBuf
(
SIterInfo
*
pIter
,
uint64_t
uid
,
int64_t
ts
,
SArray
*
pDelList
,
SRowMerger
*
pMerger
,
STsdbReader
*
pReader
);
static
int32_t
doAppendRowFromTSRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
,
uint64_t
uid
);
static
int32_t
doAppendRowFromTSRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
,
STableBlockScanInfo
*
pInfo
);
static
int32_t
doAppendRowFromFileBlock
(
SSDataBlock
*
pResBlock
,
STsdbReader
*
pReader
,
SBlockData
*
pBlockData
,
int32_t
rowIndex
);
static
void
setComposedBlockFlag
(
STsdbReader
*
pReader
,
bool
composed
);
static
bool
hasBeenDropped
(
const
SArray
*
pDelList
,
int32_t
*
index
,
TSDBKEY
*
pKey
,
int32_t
order
,
SVersionRange
*
pRange
);
static
bool
hasBeenDropped
(
const
SArray
*
pDelList
,
int32_t
*
index
,
TSDBKEY
*
pKey
,
int32_t
order
,
SVersionRange
*
p
Ver
Range
);
static
int32_t
doMergeMemTableMultiRows
(
TSDBROW
*
pRow
,
uint64_t
uid
,
SIterInfo
*
pIter
,
SArray
*
pDelList
,
STSRow
**
pTSRow
,
STsdbReader
*
pReader
,
bool
*
freeTSRow
);
...
...
@@ -208,7 +208,6 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static
int32_t
doBuildDataBlock
(
STsdbReader
*
pReader
);
static
TSDBKEY
getCurrentKeyInBuf
(
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
);
static
bool
hasDataInFileBlock
(
const
SBlockData
*
pBlockData
,
const
SFileBlockDumpInfo
*
pDumpInfo
);
static
bool
hasDataInLastBlock
(
SLastBlockReader
*
pLastBlockReader
);
static
bool
outOfTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
)
{
return
(
ts
>
pWindow
->
ekey
)
||
(
ts
<
pWindow
->
skey
);
}
...
...
@@ -534,7 +533,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
}
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfCols
;
++
i
)
{
SColumnInfoData
colInfo
=
{
{
0
},
0
};
SColumnInfoData
colInfo
=
{
0
,
{
0
}
};
colInfo
.
info
=
pCond
->
colList
[
i
];
blockDataAppendColInfo
(
pResBlock
,
&
colInfo
);
}
...
...
@@ -1529,8 +1528,8 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
// opt version
// 1. it is not a border point
// 2. the direct next point is not an duplicated timestamp
if
((
pDumpInfo
->
rowIndex
<
pDumpInfo
->
totalRows
-
1
&&
pReader
->
order
==
TSDB_ORDER_ASC
)
||
(
pDumpInfo
->
rowIndex
>
0
&&
pReader
->
order
==
TSDB_ORDER_DESC
))
{
bool
asc
=
(
pReader
->
order
==
TSDB_ORDER_ASC
);
if
((
pDumpInfo
->
rowIndex
<
pDumpInfo
->
totalRows
-
1
&&
asc
)
||
(
pDumpInfo
->
rowIndex
>
0
&&
(
!
asc
)
))
{
int32_t
step
=
pReader
->
order
==
TSDB_ORDER_ASC
?
1
:
-
1
;
int64_t
nextKey
=
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
+
step
];
...
...
@@ -1749,7 +1748,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -1770,6 +1769,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
// only last block exists
if
((
!
mergeBlockData
)
||
(
tsLastBlock
!=
pBlockData
->
aTSKEY
[
pDumpInfo
->
rowIndex
]))
{
if
(
tryCopyDistinctRowFromSttBlock
(
&
fRow
,
pLastBlockReader
,
pBlockScanInfo
,
tsLastBlock
,
pReader
))
{
pBlockScanInfo
->
lastKey
=
tsLastBlock
;
return
TSDB_CODE_SUCCESS
;
}
else
{
int32_t
code
=
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
...
...
@@ -1786,7 +1786,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -1810,7 +1810,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -1858,7 +1858,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -2082,7 +2082,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -2233,6 +2233,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
STsdbReader
*
pReader
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
if
(
tryCopyDistinctRowFromFileBlock
(
pReader
,
pBlockData
,
key
,
pDumpInfo
))
{
pBlockScanInfo
->
lastKey
=
key
;
return
TSDB_CODE_SUCCESS
;
}
else
{
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
...
...
@@ -2251,7 +2252,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
return
code
;
}
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
tRowMergerClear
(
&
merge
);
...
...
@@ -2299,29 +2300,32 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo
*
pBlockInfo
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SLastBlockReader
*
pLastBlockReader
=
pReader
->
status
.
fileIter
.
pLastBlockReader
;
bool
asc
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
int64_t
st
=
taosGetTimestampUs
();
int32_t
step
=
asc
?
1
:
-
1
;
STableBlockScanInfo
*
pBlockScanInfo
=
NULL
;
if
(
pBlockInfo
!=
NULL
)
{
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
if
(
p
BlockScanInfo
==
NULL
)
{
void
*
p
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
if
(
p
==
NULL
)
{
code
=
TSDB_CODE_INVALID_PARA
;
tsdbError
(
"failed to locate the uid:%"
PRIu64
" in query table uid list, total tables:%d, %s"
,
pBlockInfo
->
uid
,
taosHashGetSize
(
pReader
->
status
.
pTableMap
),
pReader
->
idStr
);
goto
_end
;
}
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
p
;
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
TSDBKEY
keyInBuf
=
getCurrentKeyInBuf
(
pBlockScanInfo
,
pReader
);
// it is a clean block, load it directly
if
(
isCleanFileDataBlock
(
pReader
,
pBlockInfo
,
pBlock
,
pBlockScanInfo
,
keyInBuf
,
pLastBlockReader
))
{
if
(
pReader
->
order
==
TSDB_ORDER_ASC
||
(
pReader
->
order
==
TSDB_ORDER_DESC
&&
(
!
hasDataInLastBlock
(
pLastBlockReader
))))
{
if
(
asc
||
((
!
asc
)
&&
(
!
hasDataInLastBlock
(
pLastBlockReader
))))
{
copyBlockDataToSDataBlock
(
pReader
,
pBlockScanInfo
);
// record the last key value
pBlockScanInfo
->
lastKey
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
pBlock
->
maxKey
.
ts
:
pBlock
->
minKey
.
ts
;
pBlockScanInfo
->
lastKey
=
asc
?
pBlock
->
maxKey
.
ts
:
pBlock
->
minKey
.
ts
;
goto
_end
;
}
}
...
...
@@ -2331,7 +2335,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SBlockData
*
pBlockData
=
&
pReader
->
status
.
fileBlockData
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
1
:
-
1
;
while
(
1
)
{
bool
hasBlockData
=
false
;
...
...
@@ -3220,7 +3223,6 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t
doMergeRowsInLastBlock
(
SLastBlockReader
*
pLastBlockReader
,
STableBlockScanInfo
*
pScanInfo
,
int64_t
ts
,
SRowMerger
*
pMerger
,
SVersionRange
*
pVerRange
)
{
pScanInfo
->
lastKey
=
ts
;
while
(
nextRowFromLastBlocks
(
pLastBlockReader
,
pScanInfo
,
pVerRange
))
{
int64_t
next1
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
if
(
next1
==
ts
)
{
...
...
@@ -3413,9 +3415,10 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return
TSDB_CODE_SUCCESS
;
}
int32_t
doAppendRowFromTSRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
,
uint64_t
uid
)
{
int32_t
doAppendRowFromTSRow
(
SSDataBlock
*
pBlock
,
STsdbReader
*
pReader
,
STSRow
*
pTSRow
,
STableBlockScanInfo
*
pScanInfo
)
{
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pBlock
->
pDataBlock
);
int64_t
uid
=
pScanInfo
->
uid
;
SBlockLoadSuppInfo
*
pSupInfo
=
&
pReader
->
suppInfo
;
STSchema
*
pSchema
=
doGetSchemaForTSRow
(
pTSRow
->
sver
,
pReader
,
uid
);
...
...
@@ -3454,6 +3457,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
}
pBlock
->
info
.
rows
+=
1
;
pScanInfo
->
lastKey
=
pTSRow
->
ts
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3517,7 +3521,8 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
break
;
}
doAppendRowFromTSRow
(
pBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
->
uid
);
doAppendRowFromTSRow
(
pBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
if
(
freeTSRow
)
{
taosMemoryFree
(
pTSRow
);
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
8ce2e124
...
...
@@ -421,7 +421,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
ctx
.
cInfoList
);
++
i
)
{
SColumnInfoData
colInfo
=
{
{
0
},
0
};
SColumnInfoData
colInfo
=
{
0
,
{
0
}
};
colInfo
.
info
=
*
(
SColumnInfo
*
)
taosArrayGet
(
ctx
.
cInfoList
,
i
);
blockDataAppendColInfo
(
pResBlock
,
&
colInfo
);
}
...
...
@@ -582,7 +582,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
ctx
.
cInfoList
);
++
i
)
{
SColumnInfoData
colInfo
=
{
{
0
},
0
};
SColumnInfoData
colInfo
=
{
0
,
{
0
}
};
colInfo
.
info
=
*
(
SColumnInfo
*
)
taosArrayGet
(
ctx
.
cInfoList
,
i
);
blockDataAppendColInfo
(
pResBlock
,
&
colInfo
);
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8ce2e124
...
...
@@ -2783,8 +2783,10 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
*
defaultPgsz
<<=
1u
;
}
// The default buffer for each operator in query is 10MB.
// at least four pages need to be in buffer
*
defaultBufsz
=
4096
*
256
;
// TODO: make this variable to be configurable.
*
defaultBufsz
=
4096
*
2560
;
if
((
*
defaultBufsz
)
<=
(
*
defaultPgsz
))
{
(
*
defaultBufsz
)
=
(
*
defaultPgsz
)
*
4
;
}
...
...
@@ -2971,7 +2973,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
return
pOperator
;
_error:
_error:
if
(
pInfo
!=
NULL
)
{
destroyAggOperatorInfo
(
pInfo
);
}
...
...
@@ -3187,11 +3189,12 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
_error:
if
(
pInfo
!=
NULL
)
{
destroyFillOperatorInfo
(
pInfo
);
}
pTaskInfo
->
code
=
code
;
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
...
...
tests/system-test/1-insert/delete_data.py
浏览文件 @
8ce2e124
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录