Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
92804d31
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
92804d31
编写于
11月 01, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/stream
上级
5ff701ef
20539ae0
变更
30
隐藏空白更改
内联
并排
Showing
30 changed file
with
405 addition
and
276 deletion
+405
-276
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+3
-0
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+1
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+5
-0
source/dnode/mgmt/CMakeLists.txt
source/dnode/mgmt/CMakeLists.txt
+15
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+62
-63
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+27
-3
source/libs/catalog/src/ctgUtil.c
source/libs/catalog/src/ctgUtil.c
+5
-0
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+20
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+9
-8
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+63
-57
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-51
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+68
-60
source/libs/function/CMakeLists.txt
source/libs/function/CMakeLists.txt
+38
-8
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-1
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+7
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+1
-1
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+5
-1
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+1
-0
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+11
-9
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+4
-0
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+32
-0
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+1
-1
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+7
-7
source/util/src/terror.c
source/util/src/terror.c
+1
-0
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/system-test/7-tmq/tmqDnodeRestart1.py
tests/system-test/7-tmq/tmqDnodeRestart1.py
+1
-1
tools/shell/CMakeLists.txt
tools/shell/CMakeLists.txt
+9
-1
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
92804d31
...
...
@@ -309,6 +309,9 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f
int32_t
catalogChkAuth
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
);
int32_t
catalogChkAuthFromCache
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
,
bool
*
exists
);
int32_t
catalogUpdateUserAuthInfo
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
);
int32_t
catalogUpdateVgEpSet
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
epSet
);
...
...
include/libs/function/functionMgt.h
浏览文件 @
92804d31
...
...
@@ -216,6 +216,7 @@ bool fmIsMultiRowsFunc(int32_t funcId);
bool
fmIsKeepOrderFunc
(
int32_t
funcId
);
bool
fmIsCumulativeFunc
(
int32_t
funcId
);
bool
fmIsInterpPseudoColumnFunc
(
int32_t
funcId
);
bool
fmIsGroupKeyFunc
(
int32_t
funcId
);
void
getLastCacheDataType
(
SDataType
*
pType
);
...
...
include/util/taoserror.h
浏览文件 @
92804d31
...
...
@@ -451,6 +451,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004)
#define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005)
#define TSDB_CODE_WAL_CHKSUM_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x1006)
#define TSDB_CODE_WAL_LOG_INCOMPLETE TAOS_DEF_ERROR_CODE(0, 0x1007)
// tfs
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201)
...
...
source/common/src/tdatablock.c
浏览文件 @
92804d31
...
...
@@ -1132,6 +1132,10 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
pDataBlock
->
info
.
window
.
ekey
=
0
;
pDataBlock
->
info
.
window
.
skey
=
0
;
if
(
pDataBlock
->
info
.
capacity
==
0
)
{
return
;
}
size_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
...
...
@@ -1186,6 +1190,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
}
void
colInfoDataCleanup
(
SColumnInfoData
*
pColumn
,
uint32_t
numOfRows
)
{
pColumn
->
hasNull
=
false
;
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
info
.
type
))
{
pColumn
->
varmeta
.
length
=
0
;
if
(
pColumn
->
varmeta
.
offset
!=
NULL
)
{
...
...
source/dnode/mgmt/CMakeLists.txt
浏览文件 @
92804d31
...
...
@@ -13,4 +13,18 @@ target_include_directories(
taosd
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/node_mgmt/inc"
)
target_link_libraries
(
taosd dnode
)
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEFINITIONS
(
-DTD_JEMALLOC_ENABLED -I
${
CMAKE_BINARY_DIR
}
/build/include -L
${
CMAKE_BINARY_DIR
}
/build/lib -Wl,-rpath,
${
CMAKE_BINARY_DIR
}
/build/lib -ljemalloc
)
SET
(
LINK_JEMALLOC
"-L
${
CMAKE_BINARY_DIR
}
/build/lib -ljemalloc"
)
ELSE
()
SET
(
LINK_JEMALLOC
""
)
ENDIF
()
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEPENDENCIES
(
taosd jemalloc
)
target_link_libraries
(
taosd dnode
${
LINK_JEMALLOC
}
)
ELSE
()
target_link_libraries
(
taosd dnode
)
ENDIF
()
source/dnode/vnode/inc/vnode.h
浏览文件 @
92804d31
...
...
@@ -159,7 +159,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void
tsdbReaderClose
(
STsdbReader
*
pReader
);
bool
tsdbNextDataBlock
(
STsdbReader
*
pReader
);
bool
tsdbTableNextDataBlock
(
STsdbReader
*
pReader
,
uint64_t
uid
);
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
);
void
tsdbRetrieveDataBlockInfo
(
const
STsdbReader
*
pReader
,
int32_t
*
rows
,
uint64_t
*
uid
,
STimeWindow
*
pWindow
);
int32_t
tsdbRetrieveDatablockSMA
(
STsdbReader
*
pReader
,
SColumnDataAgg
***
pBlockStatis
,
bool
*
allHave
);
SArray
*
tsdbRetrieveDataBlock
(
STsdbReader
*
pTsdbReadHandle
,
SArray
*
pColumnIdList
);
int32_t
tsdbReaderReset
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
92804d31
...
...
@@ -35,11 +35,17 @@ typedef struct {
int32_t
numOfLastFiles
;
}
SBlockNumber
;
typedef
struct
SBlockIndex
{
int32_t
ordinalIndex
;
int64_t
inFileOffset
;
STimeWindow
window
;
}
SBlockIndex
;
typedef
struct
STableBlockScanInfo
{
uint64_t
uid
;
TSKEY
lastKey
;
SMapData
mapData
;
// block info (compressed)
SArray
*
pBlockList
;
// block data index list
SArray
*
pBlockList
;
// block data index list
, SArray<SBlockIndex>
SIterInfo
iter
;
// mem buffer skip list iterator
SIterInfo
iiter
;
// imem buffer skip list iterator
SArray
*
delSkyline
;
// delete info for this table
...
...
@@ -568,7 +574,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
STableBlockScanInfo
*
pScanInfo
=
p
;
if
(
pScanInfo
->
pBlockList
==
NULL
)
{
pScanInfo
->
pBlockList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
pScanInfo
->
pBlockList
=
taosArrayInit
(
4
,
sizeof
(
SBlockIndex
));
}
taosArrayPush
(
pIndexList
,
pBlockIdx
);
...
...
@@ -630,7 +636,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
continue
;
}
void
*
p
=
taosArrayPush
(
pScanInfo
->
pBlockList
,
&
j
);
SBlockIndex
bIndex
=
{.
ordinalIndex
=
j
,
.
inFileOffset
=
block
.
aSubBlock
->
offset
};
bIndex
.
window
=
(
STimeWindow
)
{.
skey
=
block
.
minKey
.
ts
,
.
ekey
=
block
.
maxKey
.
ts
};
void
*
p
=
taosArrayPush
(
pScanInfo
->
pBlockList
,
&
bIndex
);
if
(
p
==
NULL
)
{
tMapDataClear
(
&
pScanInfo
->
mapData
);
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -821,7 +830,8 @@ int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SData
}
else
if
(
!
asc
&&
pReader
->
window
.
skey
<=
pBlock
->
minKey
.
ts
)
{
endPos
=
0
;
}
else
{
endPos
=
doBinarySearchKey
(
pBlockData
->
aTSKEY
,
pBlock
->
nRow
,
pos
,
pReader
->
window
.
ekey
,
pReader
->
order
);
int64_t
key
=
asc
?
pReader
->
window
.
ekey
:
pReader
->
window
.
skey
;
endPos
=
doBinarySearchKey
(
pBlockData
->
aTSKEY
,
pBlock
->
nRow
,
pos
,
key
,
pReader
->
order
);
}
return
endPos
;
...
...
@@ -852,8 +862,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
// pDumpInfo->rowIndex = pBlock->nRow - 1;
}
else
{
int32_t
pos
=
asc
?
pBlock
->
nRow
-
1
:
0
;
int32_t
order
=
(
pReader
->
order
==
TSDB_ORDER_ASC
)
?
TSDB_ORDER_DESC
:
TSDB_ORDER_ASC
;
pDumpInfo
->
rowIndex
=
doBinarySearchKey
(
pBlockData
->
aTSKEY
,
pBlock
->
nRow
,
pos
,
pReader
->
window
.
skey
,
order
);
int32_t
order
=
asc
?
TSDB_ORDER_DESC
:
TSDB_ORDER_ASC
;
int64_t
key
=
asc
?
pReader
->
window
.
skey
:
pReader
->
window
.
ekey
;
pDumpInfo
->
rowIndex
=
doBinarySearchKey
(
pBlockData
->
aTSKEY
,
pBlock
->
nRow
,
pos
,
key
,
order
);
}
}
...
...
@@ -1059,8 +1070,8 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
*
mapData
Index
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pBlockInfo
->
tbBlockIdx
);
tMapDataGetItemByIdx
(
&
pScanInfo
->
mapData
,
*
mapData
Index
,
&
pBlockIter
->
block
,
tGetDataBlk
);
SBlockIndex
*
p
Index
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pBlockInfo
->
tbBlockIdx
);
tMapDataGetItemByIdx
(
&
pScanInfo
->
mapData
,
pIndex
->
ordinal
Index
,
&
pBlockIter
->
block
,
tGetDataBlk
);
}
#if 0
...
...
@@ -1073,6 +1084,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
static
int32_t
initBlockIterator
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
,
int32_t
numOfBlocks
)
{
bool
asc
=
ASCENDING_TRAVERSE
(
pReader
->
order
);
SBlockOrderSupporter
sup
=
{
0
};
pBlockIter
->
numOfBlocks
=
numOfBlocks
;
taosArrayClear
(
pBlockIter
->
blockList
);
pBlockIter
->
pTableMap
=
pReader
->
status
.
pTableMap
;
...
...
@@ -1081,9 +1093,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
int32_t
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
int64_t
st
=
taosGetTimestampUs
();
SBlockOrderSupporter
sup
=
{
0
};
int32_t
code
=
initBlockOrderSupporter
(
&
sup
,
numOfTables
);
int32_t
code
=
initBlockOrderSupporter
(
&
sup
,
numOfTables
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1111,17 +1121,11 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
sup
.
pDataBlockInfo
[
sup
.
numOfTables
]
=
(
SBlockOrderWrapper
*
)
buf
;
SDataBlk
block
=
{
0
};
for
(
int32_t
k
=
0
;
k
<
num
;
++
k
)
{
SBlockOrderWrapper
wrapper
=
{
0
};
int32_t
*
mapDataIndex
=
taosArrayGet
(
pTableScanInfo
->
pBlockList
,
k
);
tMapDataGetItemByIdx
(
&
pTableScanInfo
->
mapData
,
*
mapDataIndex
,
&
block
,
tGetDataBlk
);
wrapper
.
uid
=
pTableScanInfo
->
uid
;
wrapper
.
offset
=
block
.
aSubBlock
[
0
].
offset
;
sup
.
pDataBlockInfo
[
sup
.
numOfTables
][
k
]
=
wrapper
;
for
(
int32_t
k
=
0
;
k
<
num
;
++
k
)
{
SBlockIndex
*
pIndex
=
taosArrayGet
(
pTableScanInfo
->
pBlockList
,
k
);
sup
.
pDataBlockInfo
[
sup
.
numOfTables
][
k
]
=
(
SBlockOrderWrapper
){.
uid
=
pTableScanInfo
->
uid
,
.
offset
=
pIndex
->
inFileOffset
};
cnt
++
;
}
...
...
@@ -1212,25 +1216,22 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
(
pVerRange
->
maxVer
<
pBlock
->
maxVer
&&
pVerRange
->
maxVer
>=
pBlock
->
minVer
);
}
static
SDataBlk
*
getNeighborBlockOfSameTable
(
SFileDataBlockInfo
*
pBlockInfo
,
STableBlockScanInfo
*
pTableBlockScanInfo
,
int32_t
*
nextIndex
,
int32_t
order
)
{
static
bool
getNeighborBlockOfSameTable
(
SFileDataBlockInfo
*
pBlockInfo
,
STableBlockScanInfo
*
pTableBlockScanInfo
,
int32_t
*
nextIndex
,
int32_t
order
,
SBlockIndex
*
pBlockIndex
)
{
bool
asc
=
ASCENDING_TRAVERSE
(
order
);
if
(
asc
&&
pBlockInfo
->
tbBlockIdx
>=
taosArrayGetSize
(
pTableBlockScanInfo
->
pBlockList
)
-
1
)
{
return
NULL
;
return
false
;
}
if
(
!
asc
&&
pBlockInfo
->
tbBlockIdx
==
0
)
{
return
NULL
;
return
false
;
}
int32_t
step
=
asc
?
1
:
-
1
;
*
nextIndex
=
pBlockInfo
->
tbBlockIdx
+
step
;
SDataBlk
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SDataBlk
));
int32_t
*
indexInMapdata
=
taosArrayGet
(
pTableBlockScanInfo
->
pBlockList
,
*
nextIndex
);
tMapDataGetItemByIdx
(
&
pTableBlockScanInfo
->
mapData
,
*
indexInMapdata
,
pBlock
,
tGetDataBlk
);
return
pBlock
;
*
pBlockIndex
=
*
(
SBlockIndex
*
)
taosArrayGet
(
pTableBlockScanInfo
->
pBlockList
,
*
nextIndex
);
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
return
true
;
}
static
int32_t
findFileBlockInfoIndex
(
SDataBlockIter
*
pBlockIter
,
SFileDataBlockInfo
*
pFBlockInfo
)
{
...
...
@@ -1272,12 +1273,12 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t
return
TSDB_CODE_SUCCESS
;
}
static
bool
overlapWithNeighborBlock
(
SDataBlk
*
pBlock
,
S
DataBlk
*
pNeighbor
,
int32_t
order
)
{
static
bool
overlapWithNeighborBlock
(
SDataBlk
*
pBlock
,
S
BlockIndex
*
pNeighborBlockIndex
,
int32_t
order
)
{
// it is the last block in current file, no chance to overlap with neighbor blocks.
if
(
ASCENDING_TRAVERSE
(
order
))
{
return
pBlock
->
maxKey
.
ts
==
pNeighbor
->
minKey
.
ts
;
return
pBlock
->
maxKey
.
ts
==
pNeighbor
BlockIndex
->
window
.
skey
;
}
else
{
return
pBlock
->
minKey
.
ts
==
pNeighbor
->
maxKey
.
ts
;
return
pBlock
->
minKey
.
ts
==
pNeighbor
BlockIndex
->
window
.
ekey
;
}
}
...
...
@@ -1364,13 +1365,14 @@ typedef struct {
static
void
getBlockToLoadInfo
(
SDataBlockToLoadInfo
*
pInfo
,
SFileDataBlockInfo
*
pBlockInfo
,
SDataBlk
*
pBlock
,
STableBlockScanInfo
*
pScanInfo
,
TSDBKEY
keyInBuf
,
SLastBlockReader
*
pLastBlockReader
,
STsdbReader
*
pReader
)
{
int32_t
neighborIndex
=
0
;
SDataBlk
*
pNeighbor
=
getNeighborBlockOfSameTable
(
pBlockInfo
,
pScanInfo
,
&
neighborIndex
,
pReader
->
order
);
int32_t
neighborIndex
=
0
;
SBlockIndex
bIndex
=
{
0
};
bool
hasNeighbor
=
getNeighborBlockOfSameTable
(
pBlockInfo
,
pScanInfo
,
&
neighborIndex
,
pReader
->
order
,
&
bIndex
);
// overlap with neighbor
if
(
pNeighbor
)
{
pInfo
->
overlapWithNeighborBlock
=
overlapWithNeighborBlock
(
pBlock
,
pNeighbor
,
pReader
->
order
);
taosMemoryFree
(
pNeighbor
);
if
(
hasNeighbor
)
{
pInfo
->
overlapWithNeighborBlock
=
overlapWithNeighborBlock
(
pBlock
,
&
bIndex
,
pReader
->
order
);
}
// has duplicated ts of different version in this block
...
...
@@ -3067,15 +3069,15 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
*
state
=
CHECK_FILEBLOCK_QUIT
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
1
:
-
1
;
int32_t
nextIndex
=
-
1
;
SDataBlk
*
pNeighborBlock
=
getNeighborBlockOfSameTable
(
pFBlock
,
pScanInfo
,
&
nextIndex
,
pReader
->
order
);
if
(
pNeighborBlock
==
NULL
)
{
// do nothing
int32_t
nextIndex
=
-
1
;
SBlockIndex
bIndex
=
{
0
};
bool
hasNeighbor
=
getNeighborBlockOfSameTable
(
pFBlock
,
pScanInfo
,
&
nextIndex
,
pReader
->
order
,
&
bIndex
);
if
(
!
hasNeighbor
)
{
// do nothing
return
0
;
}
bool
overlap
=
overlapWithNeighborBlock
(
pBlock
,
pNeighborBlock
,
pReader
->
order
);
taosMemoryFree
(
pNeighborBlock
);
bool
overlap
=
overlapWithNeighborBlock
(
pBlock
,
&
bIndex
,
pReader
->
order
);
if
(
overlap
)
{
// load next block
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
SDataBlockIter
*
pBlockIter
=
&
pStatus
->
blockIter
;
...
...
@@ -3496,7 +3498,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pReader
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
// resetDataBlockScanInfo(pReader->status.pTableMap, pReader->window.skey);
// no data in files, let's try buffer in memory
if
(
pReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
...
...
@@ -3799,24 +3800,24 @@ bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
return
true
;
}
static
void
setBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
)
{
ASSERT
(
p
DataBlockInfo
!=
NULL
&&
p
Reader
!=
NULL
);
pDataBlockInfo
->
rows
=
pReader
->
pResBlock
->
info
.
rows
;
pDataBlockInfo
->
uid
=
pReader
->
pResBlock
->
info
.
uid
;
pDataBlockInfo
->
w
indow
=
pReader
->
pResBlock
->
info
.
window
;
static
void
setBlockInfo
(
const
STsdbReader
*
pReader
,
int32_t
*
rows
,
uint64_t
*
uid
,
STimeWindow
*
pWindow
)
{
ASSERT
(
pReader
!=
NULL
);
*
rows
=
pReader
->
pResBlock
->
info
.
rows
;
*
uid
=
pReader
->
pResBlock
->
info
.
uid
;
*
pW
indow
=
pReader
->
pResBlock
->
info
.
window
;
}
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
)
{
void
tsdbRetrieveDataBlockInfo
(
const
STsdbReader
*
pReader
,
int32_t
*
rows
,
uint64_t
*
uid
,
STimeWindow
*
pWindow
)
{
if
(
pReader
->
type
==
TIMEWINDOW_RANGE_EXTERNAL
)
{
if
(
pReader
->
step
==
EXTERNAL_ROWS_MAIN
)
{
setBlockInfo
(
pReader
,
pDataBlockInfo
);
setBlockInfo
(
pReader
,
rows
,
uid
,
pWindow
);
}
else
if
(
pReader
->
step
==
EXTERNAL_ROWS_PREV
)
{
setBlockInfo
(
pReader
->
innerReader
[
0
],
pDataBlockInfo
);
setBlockInfo
(
pReader
->
innerReader
[
0
],
rows
,
uid
,
pWindow
);
}
else
{
setBlockInfo
(
pReader
->
innerReader
[
1
],
pDataBlockInfo
);
setBlockInfo
(
pReader
->
innerReader
[
1
],
rows
,
uid
,
pWindow
);
}
}
else
{
setBlockInfo
(
pReader
,
pDataBlockInfo
);
setBlockInfo
(
pReader
,
rows
,
uid
,
pWindow
);
}
}
...
...
@@ -3838,7 +3839,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
SFileDataBlockInfo
*
pFBlock
=
getCurrentBlockInfo
(
&
pReader
->
status
.
blockIter
);
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
int64_t
stime
=
taosGetTimestampUs
();
//
int64_t stime = taosGetTimestampUs();
SBlockLoadSuppInfo
*
pSup
=
&
pReader
->
suppInfo
;
...
...
@@ -3869,7 +3870,9 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
size_t
numOfCols
=
blockDataGetNumOfCols
(
pReader
->
pResBlock
);
int32_t
i
=
0
,
j
=
0
;
while
(
j
<
numOfCols
&&
i
<
taosArrayGetSize
(
pSup
->
pColAgg
))
{
size_t
size
=
taosArrayGetSize
(
pSup
->
pColAgg
);
while
(
j
<
numOfCols
&&
i
<
size
)
{
SColumnDataAgg
*
pAgg
=
taosArrayGet
(
pSup
->
pColAgg
,
i
);
if
(
pAgg
->
colId
==
pSup
->
colIds
[
j
])
{
if
(
IS_BSMA_ON
(
&
(
pReader
->
pSchema
->
columns
[
i
])))
{
...
...
@@ -3886,14 +3889,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
}
}
double
elapsed
=
(
taosGetTimestampUs
()
-
stime
)
/
1000
.
0
;
pReader
->
cost
.
smaLoadTime
+=
elapsed
;
pReader
->
cost
.
smaDataLoad
+=
1
;
*
pBlockStatis
=
pSup
->
plist
;
tsdbDebug
(
"vgId:%d, succeed to load block SMA for uid %"
PRIu64
", elapsed time:%.2f ms, %s"
,
0
,
pFBlock
->
uid
,
elapsed
,
pReader
->
idStr
);
tsdbDebug
(
"vgId:%d, succeed to load block SMA for uid %"
PRIu64
", %s"
,
0
,
pFBlock
->
uid
,
pReader
->
idStr
);
return
code
;
}
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
92804d31
...
...
@@ -320,7 +320,7 @@ _return:
}
int32_t
ctgChkAuth
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
)
{
bool
*
pass
,
bool
*
exists
)
{
bool
inCache
=
false
;
int32_t
code
=
0
;
...
...
@@ -329,6 +329,13 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, co
CTG_ERR_RET
(
ctgChkAuthFromCache
(
pCtg
,
(
char
*
)
user
,
(
char
*
)
dbFName
,
type
,
&
inCache
,
pass
));
if
(
inCache
)
{
if
(
exists
)
{
*
exists
=
true
;
}
return
TSDB_CODE_SUCCESS
;
}
else
if
(
exists
)
{
*
exists
=
false
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1032,7 +1039,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray*
switch
(
tbType
)
{
case
TSDB_CHILD_TABLE
:
{
SName
stb
=
name
;
strcpy
(
stb
.
tname
,
stbName
);
tstrncpy
(
stb
.
tname
,
stbName
,
sizeof
(
stb
.
tname
)
);
ctgRemoveTbMeta
(
pCtg
,
&
stb
);
break
;
}
...
...
@@ -1373,13 +1380,30 @@ int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user
}
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgChkAuth
(
pCtg
,
pConn
,
user
,
dbFName
,
type
,
pass
));
CTG_ERR_JRET
(
ctgChkAuth
(
pCtg
,
pConn
,
user
,
dbFName
,
type
,
pass
,
NULL
));
_return:
CTG_API_LEAVE
(
code
);
}
int32_t
catalogChkAuthFromCache
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
,
bool
*
exists
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pConn
||
NULL
==
user
||
NULL
==
dbFName
||
NULL
==
pass
||
NULL
==
exists
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgChkAuth
(
pCtg
,
pConn
,
user
,
dbFName
,
type
,
pass
,
exists
));
_return:
CTG_API_LEAVE
(
code
);
}
int32_t
catalogGetServerVersion
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
char
**
pVersion
)
{
CTG_API_ENTER
();
...
...
source/libs/catalog/src/ctgUtil.c
浏览文件 @
92804d31
...
...
@@ -924,6 +924,11 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
if
(
1
==
vgNum
)
{
void
*
pIter
=
taosHashIterate
(
dbInfo
->
vgHash
,
NULL
);
if
(
NULL
==
pIter
)
{
ctgError
(
"empty vgHash, db:%s, vgroup number:%d"
,
dbFName
,
vgNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
for
(
int32_t
i
=
0
;
i
<
tbNum
;
++
i
)
{
vgInfo
=
taosMemoryMalloc
(
sizeof
(
SVgroupInfo
));
if
(
NULL
==
vgInfo
)
{
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
92804d31
...
...
@@ -2771,10 +2771,30 @@ TEST(apiTest, catalogChkAuth_test) {
ASSERT_EQ
(
code
,
0
);
bool
pass
=
false
;
bool
exists
=
false
;
code
=
catalogChkAuthFromCache
(
pCtg
,
mockPointer
,
ctgTestUsername
,
ctgTestDbname
,
AUTH_TYPE_READ
,
&
pass
,
&
exists
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
exists
,
false
);
code
=
catalogChkAuth
(
pCtg
,
mockPointer
,
ctgTestUsername
,
ctgTestDbname
,
AUTH_TYPE_READ
,
&
pass
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pass
,
true
);
while
(
true
)
{
uint64_t
n
=
0
;
ctgdGetStatNum
(
"runtime.numOfOpDequeue"
,
(
void
*
)
&
n
);
if
(
n
!=
1
)
{
taosMsleep
(
50
);
}
else
{
break
;
}
}
code
=
catalogChkAuthFromCache
(
pCtg
,
mockPointer
,
ctgTestUsername
,
ctgTestDbname
,
AUTH_TYPE_READ
,
&
pass
,
&
exists
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pass
,
true
);
ASSERT_EQ
(
exists
,
true
);
catalogDestroy
();
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
92804d31
...
...
@@ -918,7 +918,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
SColMatchInfo
*
pColMatchInfo
,
SFilterInfo
*
pFilterInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
);
void
cleanupAggSup
(
SAggSupporter
*
pAggSup
);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
92804d31
...
...
@@ -152,25 +152,27 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo
->
indexOfBufferedRes
=
0
;
}
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
if
(
pInfo
->
indexOfBufferedRes
<
pInfo
->
pBufferredRes
->
info
.
rows
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
);
++
i
)
{
SColMatchItem
*
pMatchInfo
=
taosArrayGet
(
pInfo
->
matchInfo
.
pList
,
i
);
int32_t
slotId
=
pMatchInfo
->
dstSlotId
;
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pInfo
->
pBufferredRes
->
pDataBlock
,
slotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
p
Info
->
p
Res
->
pDataBlock
,
slotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pRes
->
pDataBlock
,
slotId
);
char
*
p
=
colDataGetData
(
pSrc
,
pInfo
->
indexOfBufferedRes
);
bool
isNull
=
colDataIsNull_s
(
pSrc
,
pInfo
->
indexOfBufferedRes
);
colDataAppend
(
pDst
,
0
,
p
,
isNull
);
}
p
Info
->
p
Res
->
info
.
uid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pInfo
->
pUidList
,
pInfo
->
indexOfBufferedRes
);
p
Info
->
p
Res
->
info
.
rows
=
1
;
pRes
->
info
.
uid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pInfo
->
pUidList
,
pInfo
->
indexOfBufferedRes
);
pRes
->
info
.
rows
=
1
;
if
(
pInfo
->
pseudoExprSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pInfo
->
pseudoExprSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
p
Info
->
pRe
s
,
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
p
Res
,
pRes
->
info
.
row
s
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
...
...
@@ -178,10 +180,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
}
pInfo
->
pRes
->
info
.
groupId
=
getTableGroupId
(
pTableList
,
pInfo
->
pRes
->
info
.
uid
);
pRes
->
info
.
groupId
=
getTableGroupId
(
pTableList
,
pRes
->
info
.
uid
);
pInfo
->
indexOfBufferedRes
+=
1
;
return
p
Info
->
p
Res
;
return
pRes
;
}
else
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
...
...
@@ -221,7 +222,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
ASSERT
((
pInfo
->
retrieveType
&
CACHESCAN_RETRIEVE_LAST_ROW
)
==
CACHESCAN_RETRIEVE_LAST_ROW
);
pInfo
->
pRes
->
info
.
uid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pInfo
->
pUidList
,
0
);
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pInfo
->
pRes
,
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
92804d31
...
...
@@ -775,12 +775,67 @@ end:
return
code
;
}
static
int32_t
nameComparFn
(
const
void
*
p1
,
const
void
*
p2
)
{
const
char
*
pName1
=
*
(
const
char
**
)
p1
;
const
char
*
pName2
=
*
(
const
char
**
)
p2
;
int32_t
ret
=
strcmp
(
pName1
,
pName2
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
return
(
ret
>
0
)
?
1
:
-
1
;
}
}
static
SArray
*
getTableNameList
(
const
SNodeListNode
*
pList
)
{
int32_t
len
=
LIST_LENGTH
(
pList
->
pNodeList
);
SListCell
*
cell
=
pList
->
pNodeList
->
pHead
;
SArray
*
pTbList
=
taosArrayInit
(
len
,
POINTER_BYTES
);
for
(
int
i
=
0
;
i
<
pList
->
pNodeList
->
length
;
i
++
)
{
SValueNode
*
valueNode
=
(
SValueNode
*
)
cell
->
pNode
;
if
(
!
IS_VAR_DATA_TYPE
(
valueNode
->
node
.
resType
.
type
))
{
terrno
=
TSDB_CODE_INVALID_PARA
;
taosArrayDestroy
(
pTbList
);
return
NULL
;
}
char
*
name
=
varDataVal
(
valueNode
->
datum
.
p
);
taosArrayPush
(
pTbList
,
&
name
);
cell
=
cell
->
pNext
;
}
size_t
numOfTables
=
taosArrayGetSize
(
pTbList
);
// order the name
taosArraySort
(
pTbList
,
nameComparFn
);
// remove the duplicates
SArray
*
pNewList
=
taosArrayInit
(
taosArrayGetSize
(
pTbList
),
sizeof
(
void
*
));
taosArrayPush
(
pNewList
,
taosArrayGet
(
pTbList
,
0
));
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
char
**
name
=
taosArrayGetLast
(
pNewList
);
char
**
nameInOldList
=
taosArrayGet
(
pTbList
,
i
);
if
(
strcmp
(
*
name
,
*
nameInOldList
)
==
0
)
{
continue
;
}
taosArrayPush
(
pNewList
,
nameInOldList
);
}
taosArrayDestroy
(
pTbList
);
return
pNewList
;
}
static
int
tableUidCompare
(
const
void
*
a
,
const
void
*
b
)
{
int64_t
u1
=
*
(
uint64_t
*
)
a
;
int64_t
u2
=
*
(
uint64_t
*
)
b
;
uint64_t
u1
=
*
(
uint64_t
*
)
a
;
uint64_t
u2
=
*
(
uint64_t
*
)
b
;
if
(
u1
==
u2
)
{
return
0
;
}
return
u1
<
u2
?
-
1
:
1
;
}
...
...
@@ -803,7 +858,9 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
SNodeList
*
pList
=
(
SNodeList
*
)
pNode
->
pParameterList
;
int32_t
len
=
LIST_LENGTH
(
pList
);
if
(
len
<=
0
)
return
ret
;
if
(
len
<=
0
)
{
return
ret
;
}
SListCell
*
cell
=
pList
->
pHead
;
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
...
...
@@ -814,6 +871,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
}
cell
=
cell
->
pNext
;
}
taosArraySort
(
list
,
tableUidCompare
);
taosArrayRemoveDuplicate
(
list
,
tableUidCompare
,
NULL
);
...
...
@@ -821,6 +879,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list
ret
=
metaGetTableTagsByUids
(
metaHandle
,
suid
,
list
,
tags
);
removeInvalidTable
(
list
,
tags
);
}
return
ret
;
}
...
...
@@ -844,59 +903,6 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
return
0
;
}
static
int32_t
nameComparFn
(
const
void
*
p1
,
const
void
*
p2
)
{
const
char
*
pName1
=
*
(
const
char
**
)
p1
;
const
char
*
pName2
=
*
(
const
char
**
)
p2
;
int32_t
ret
=
strcmp
(
pName1
,
pName2
);
if
(
ret
==
0
)
{
return
0
;
}
else
{
return
(
ret
>
0
)
?
1
:
-
1
;
}
}
static
SArray
*
getTableNameList
(
const
SNodeListNode
*
pList
)
{
int32_t
len
=
LIST_LENGTH
(
pList
->
pNodeList
);
SListCell
*
cell
=
pList
->
pNodeList
->
pHead
;
SArray
*
pTbList
=
taosArrayInit
(
len
,
POINTER_BYTES
);
for
(
int
i
=
0
;
i
<
pList
->
pNodeList
->
length
;
i
++
)
{
SValueNode
*
valueNode
=
(
SValueNode
*
)
cell
->
pNode
;
if
(
!
IS_VAR_DATA_TYPE
(
valueNode
->
node
.
resType
.
type
))
{
terrno
=
TSDB_CODE_INVALID_PARA
;
taosArrayDestroy
(
pTbList
);
return
NULL
;
}
char
*
name
=
varDataVal
(
valueNode
->
datum
.
p
);
taosArrayPush
(
pTbList
,
&
name
);
cell
=
cell
->
pNext
;
}
size_t
numOfTables
=
taosArrayGetSize
(
pTbList
);
// order the name
taosArraySort
(
pTbList
,
nameComparFn
);
// remove the duplicates
SArray
*
pNewList
=
taosArrayInit
(
taosArrayGetSize
(
pTbList
),
sizeof
(
void
*
));
taosArrayPush
(
pNewList
,
taosArrayGet
(
pTbList
,
0
));
for
(
int32_t
i
=
1
;
i
<
numOfTables
;
++
i
)
{
char
**
name
=
taosArrayGetLast
(
pNewList
);
char
**
nameInOldList
=
taosArrayGet
(
pTbList
,
i
);
if
(
strcmp
(
*
name
,
*
nameInOldList
)
==
0
)
{
continue
;
}
taosArrayPush
(
pNewList
,
nameInOldList
);
}
taosArrayDestroy
(
pTbList
);
return
pNewList
;
}
static
int32_t
optimizeTbnameInCondImpl
(
void
*
metaHandle
,
int64_t
suid
,
SArray
*
list
,
SNode
*
pTagCond
)
{
if
(
nodeType
(
pTagCond
)
!=
QUERY_NODE_OPERATOR
)
{
return
-
1
;
...
...
@@ -1757,7 +1763,7 @@ STableListInfo* tableListCreate() {
goto
_error
;
}
pListInfo
->
map
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BI
NARY
),
false
,
HASH_ENTRY_LOCK
);
pListInfo
->
map
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BI
GINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
pListInfo
->
map
==
NULL
)
{
goto
_error
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
92804d31
...
...
@@ -574,6 +574,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
// if the source equals to the destination, it is to create a new column as the result of scalar
// function or some operators.
bool
createNewColModel
=
(
pResult
==
pSrcBlock
);
if
(
createNewColModel
)
{
blockDataEnsureCapacity
(
pResult
,
pResult
->
info
.
rows
);
}
int32_t
numOfRows
=
0
;
...
...
@@ -623,6 +626,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t
startOffset
=
createNewColModel
?
0
:
pResult
->
info
.
rows
;
ASSERT
(
pResult
->
info
.
capacity
>
0
);
colDataMergeCol
(
pResColData
,
startOffset
,
(
int32_t
*
)
&
pResult
->
info
.
capacity
,
&
idata
,
dest
.
numOfRows
);
colDataDestroy
(
&
idata
);
...
...
@@ -844,57 +848,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
return
win
;
}
#if 0
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
STimeWindow w = {0};
TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
if (true) {
// getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
assert(w.ekey >= pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey) {
return true;
}
while (1) {
// getNextTimeWindow(pQueryAttr, &w);
if (w.skey > pBlockInfo->window.ekey) {
break;
}
assert(w.ekey > pBlockInfo->window.ekey);
if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
return true;
}
}
} else {
// getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
assert(w.skey <= pBlockInfo->window.ekey);
if (w.skey > pBlockInfo->window.skey) {
return true;
}
while (1) {
// getNextTimeWindow(pQueryAttr, &w);
if (w.ekey < pBlockInfo->window.skey) {
break;
}
assert(w.skey < pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
return true;
}
}
}
return false;
}
#endif
int32_t
loadDataBlockOnDemand
(
SExecTaskInfo
*
pTaskInfo
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
*
status
=
BLK_DATA_NOT_LOAD
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
92804d31
...
...
@@ -331,7 +331,8 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
}
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
matchInfo
.
pList
);
++
i
)
{
size_t
num
=
taosArrayGetSize
(
pTableScanInfo
->
matchInfo
.
pList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SColMatchItem
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
matchInfo
.
pList
,
i
);
if
(
!
pColMatchInfo
->
needOutput
)
{
continue
;
...
...
@@ -343,11 +344,11 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return
true
;
}
static
void
doSetTagColumnData
(
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
static
void
doSetTagColumnData
(
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
rows
)
{
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pTableScanInfo
->
pseudoSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
...
...
@@ -382,6 +383,17 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
}
}
static
void
ensureBlockCapacity
(
SSDataBlock
*
pBlock
,
int32_t
capacity
)
{
// keep the value of rows temporarily
int32_t
rows
=
pBlock
->
info
.
rows
;
pBlock
->
info
.
rows
=
0
;
blockDataEnsureCapacity
(
pBlock
,
capacity
);
// restore the rows number
pBlock
->
info
.
rows
=
rows
;
}
static
int32_t
loadDataBlock
(
SOperatorInfo
*
pOperator
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -413,7 +425,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
qDebug
(
"%s data block skipped, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
ensureBlockCapacity
(
pBlock
,
pBlock
->
info
.
rows
);
}
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
1
);
pCost
->
skipBlocks
+=
1
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_STATIS_LOAD
)
{
...
...
@@ -423,7 +438,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if
(
success
)
{
// failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug
(
"%s data block SMA loaded, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
ensureBlockCapacity
(
pBlock
,
pBlock
->
info
.
rows
);
}
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
1
);
return
TSDB_CODE_SUCCESS
;
}
else
{
qDebug
(
"%s failed to load SMA, since not all columns have SMA"
,
GET_TASKID
(
pTaskInfo
));
...
...
@@ -472,8 +491,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return
terrno
;
}
ensureBlockCapacity
(
pBlock
,
pBlock
->
info
.
rows
);
relocateColumnData
(
pBlock
,
pTableScanInfo
->
matchInfo
.
pList
,
pCols
,
true
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
pBlock
->
info
.
rows
);
// restore the previous value
pCost
->
totalRows
-=
pBlock
->
info
.
rows
;
...
...
@@ -513,27 +533,30 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
}
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
)
{
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
)
{
// currently only the tbname pseudo column
if
(
numOfPseudoExpr
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
// backup the rows
int32_t
backupRows
=
pBlock
->
info
.
rows
;
pBlock
->
info
.
rows
=
rows
;
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
pBlock
->
info
.
uid
);
metaReaderReleaseLock
(
&
mr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
pBlock
->
info
.
uid
,
tstrerror
(
terrno
),
idStr
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
metaReaderReleaseLock
(
&
mr
);
for
(
int32_t
j
=
0
;
j
<
numOfPseudoExpr
;
++
j
)
{
SExprInfo
*
pExpr
=
&
pPseudoExpr
[
j
];
int32_t
dstSlotId
=
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
dstSlotId
=
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
dstSlotId
);
colInfoDataCleanup
(
pColInfoData
,
pBlock
->
info
.
rows
);
...
...
@@ -572,6 +595,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
}
metaReaderClear
(
&
mr
);
// restore the rows
pBlock
->
info
.
rows
=
backupRows
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -616,14 +642,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
pTableScanInfo
->
dataReader
,
&
binfo
);
SDataBlockInfo
*
pBInfo
=
&
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
pTableScanInfo
->
dataReader
,
&
pBInfo
->
rows
,
&
pBInfo
->
uid
,
&
pBInfo
->
window
);
binfo
.
capacity
=
binfo
.
rows
;
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
=
binfo
;
ASSERT
(
binfo
.
uid
!=
0
);
ASSERT
(
pBInfo
->
uid
!=
0
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableInfoList
,
pBlock
->
info
.
uid
);
uint32_t
status
=
0
;
...
...
@@ -1146,20 +1168,19 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
bool
hasBlock
=
tsdbNextDataBlock
(
pReader
);
if
(
hasBlock
)
{
SDataBlockInfo
binfo
=
{
0
};
tsdbRetrieveDataBlockInfo
(
pReader
,
&
binfo
);
SDataBlockInfo
*
pBInfo
=
&
pBlock
->
info
;
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
pReader
,
NULL
)
;
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
pReader
,
&
rows
,
&
pBInfo
->
uid
,
&
pBInfo
->
window
);
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
pReader
,
NULL
)
;
blockDataEnsureCapacity
(
pBlock
,
rows
)
;
pBlock
->
info
.
rows
=
rows
;
relocateColumnData
(
pBlock
,
pTableScanInfo
->
matchInfo
.
pList
,
pCols
,
true
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
,
rows
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableInfoList
,
binfo
.
uid
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableInfoList
,
pBInfo
->
uid
);
}
tsdbReaderClose
(
pReader
);
...
...
@@ -1181,12 +1202,6 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
static
uint64_t
getGroupIdByUid
(
SStreamScanInfo
*
pInfo
,
uint64_t
uid
)
{
return
getTableGroupId
(
pInfo
->
pTableScanOp
->
pTaskInfo
->
pTableInfoList
,
uid
);
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map;
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
// if (groupId) {
// return *groupId;
// }
// return 0;
}
static
uint64_t
getGroupIdByData
(
SStreamScanInfo
*
pInfo
,
uint64_t
uid
,
TSKEY
ts
,
int64_t
maxVersion
)
{
...
...
@@ -1631,7 +1646,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
pInfo
->
pRes
->
info
.
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
...
...
@@ -1641,11 +1656,11 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
if
(
filter
)
{
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
NULL
,
NULL
);
}
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
calBlockTbName
(
&
pInfo
->
tbnameCalSup
,
pInfo
->
pRes
);
return
0
;
}
...
...
@@ -2151,7 +2166,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
tsdbRetrieveDataBlockInfo
(
pInfo
->
dataReader
,
&
pBlock
->
info
);
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
pInfo
->
dataReader
,
&
rows
,
&
pBlock
->
info
.
uid
,
&
pBlock
->
info
.
window
);
pBlock
->
info
.
rows
=
rows
;
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
pInfo
->
dataReader
,
NULL
);
pBlock
->
pDataBlock
=
pCols
;
...
...
@@ -4368,7 +4385,7 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pseudoSup
.
pExprInfo
,
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
pBlock
->
info
.
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -4485,7 +4502,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pseudoSup
.
pExprInfo
,
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
pBlock
->
info
.
rows
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -4549,14 +4566,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
}
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
binfo
);
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
.
type
=
binfo
.
type
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
rows
,
&
pBlock
->
info
.
uid
,
&
pBlock
->
info
.
window
);
blockDataEnsureCapacity
(
pBlock
,
rows
);
pBlock
->
info
.
rows
=
rows
;
if
(
pQueryCond
->
order
==
TSDB_ORDER_ASC
)
{
pQueryCond
->
twindows
.
skey
=
pBlock
->
info
.
window
.
ekey
+
1
;
...
...
@@ -4612,14 +4626,11 @@ static SSDataBlock* getTableDataBlock2(void* param) {
}
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
binfo
);
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
.
type
=
binfo
.
type
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
rows
,
&
pBlock
->
info
.
uid
,
&
pBlock
->
info
.
window
);
blockDataEnsureCapacity
(
pBlock
,
rows
);
pBlock
->
info
.
rows
=
rows
;
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlockFromOneTable2
(
pOperator
,
pTableScanInfo
,
pBlock
,
&
status
);
...
...
@@ -4665,14 +4676,11 @@ static SSDataBlock* getTableDataBlock(void* param) {
}
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
binfo
);
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
.
type
=
binfo
.
type
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
int32_t
rows
=
0
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
rows
,
&
pBlock
->
info
.
uid
,
&
pBlock
->
info
.
window
);
blockDataEnsureCapacity
(
pBlock
,
rows
);
pBlock
->
info
.
rows
=
rows
;
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlockFromOneTable
(
pOperator
,
pTableScanInfo
,
readerIdx
,
pBlock
,
&
status
);
...
...
source/libs/function/CMakeLists.txt
浏览文件 @
92804d31
...
...
@@ -12,6 +12,17 @@ target_include_directories(
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEFINITIONS
(
-DTD_JEMALLOC_ENABLED -I
${
CMAKE_BINARY_DIR
}
/build/include -L
${
CMAKE_BINARY_DIR
}
/build/lib -Wl,-rpath,
${
CMAKE_BINARY_DIR
}
/build/lib -ljemalloc
)
SET
(
LINK_JEMALLOC
"-L
${
CMAKE_BINARY_DIR
}
/build/lib -ljemalloc"
)
ELSE
()
SET
(
LINK_JEMALLOC
""
)
ENDIF
()
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEPENDENCIES
(
function jemalloc
)
ENDIF
()
target_link_libraries
(
function
PRIVATE os
...
...
@@ -21,7 +32,7 @@ target_link_libraries(
PRIVATE qcom
PRIVATE scalar
PRIVATE transport
PRIVATE stream
PRIVATE stream
${
LINK_JEMALLOC
}
PUBLIC uv_a
)
...
...
@@ -37,10 +48,15 @@ target_include_directories(
"
${
TD_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEPENDENCIES
(
runUdf jemalloc
)
ENDIF
()
target_link_libraries
(
runUdf
PUBLIC uv_a
PRIVATE os util common nodes function
PRIVATE os util common nodes function
${
LINK_JEMALLOC
}
)
add_library
(
udf1 STATIC MODULE test/udf1.c
)
...
...
@@ -54,8 +70,13 @@ target_include_directories(
"
${
TD_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEPENDENCIES
(
udf1 jemalloc
)
ENDIF
()
target_link_libraries
(
udf1 PUBLIC os
)
udf1 PUBLIC os
${
LINK_JEMALLOC
}
)
add_library
(
udf2 STATIC MODULE test/udf2.c
)
target_include_directories
(
...
...
@@ -68,8 +89,13 @@ target_include_directories(
"
${
TD_SOURCE_DIR
}
/include/os"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEPENDENCIES
(
udf2 jemalloc
)
ENDIF
()
target_link_libraries
(
udf2 PUBLIC os
udf2 PUBLIC os
${
LINK_JEMALLOC
}
)
#SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
...
...
@@ -86,9 +112,13 @@ target_include_directories(
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEPENDENCIES
(
udfd jemalloc
)
ENDIF
()
target_link_libraries
(
udfd
PUBLIC uv_a
PRIVATE os util common nodes function
)
udfd
PUBLIC uv_a
PRIVATE os util common nodes function
${
LINK_JEMALLOC
}
)
source/libs/function/src/builtinsimpl.c
浏览文件 @
92804d31
...
...
@@ -497,7 +497,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return
true
;
}
static
FORCE_INLINE
int32_t
getNumOfElems
(
SqlFunctionCtx
*
pCtx
)
{
static
int32_t
getNumOfElems
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
/*
...
...
source/libs/function/src/functionMgt.c
浏览文件 @
92804d31
...
...
@@ -251,6 +251,13 @@ bool fmIsSelectValueFunc(int32_t funcId) {
return
FUNCTION_TYPE_SELECT_VALUE
==
funcMgtBuiltins
[
funcId
].
type
;
}
bool
fmIsGroupKeyFunc
(
int32_t
funcId
)
{
if
(
funcId
<
0
||
funcId
>=
funcMgtBuiltinsNum
)
{
return
false
;
}
return
FUNCTION_TYPE_GROUP_KEY
==
funcMgtBuiltins
[
funcId
].
type
;
}
void
fmFuncMgtDestroy
()
{
void
*
m
=
gFunMgtService
.
pFuncNameHashTable
;
if
(
m
!=
NULL
&&
atomic_val_compare_exchange_ptr
((
void
**
)
&
gFunMgtService
.
pFuncNameHashTable
,
m
,
0
)
==
m
)
{
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
92804d31
...
...
@@ -5160,7 +5160,7 @@ static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* p
static
int32_t
translateDropIndex
(
STranslateContext
*
pCxt
,
SDropIndexStmt
*
pStmt
)
{
SMDropSmaReq
dropSmaReq
=
{
0
};
SName
name
;
tNameExtractFullName
(
toName
(
pCxt
->
pParseCxt
->
acctId
,
p
Cxt
->
pParseCxt
->
db
,
pStmt
->
indexName
,
&
name
),
dropSmaReq
.
name
);
tNameExtractFullName
(
toName
(
pCxt
->
pParseCxt
->
acctId
,
p
Stmt
->
indexDbName
,
pStmt
->
indexName
,
&
name
),
dropSmaReq
.
name
);
dropSmaReq
.
igNotExists
=
pStmt
->
ignoreNotExists
;
return
buildCmdMsg
(
pCxt
,
TDMT_MND_DROP_SMA
,
(
FSerializeFunc
)
tSerializeSMDropSmaReq
,
&
dropSmaReq
);
}
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
92804d31
...
...
@@ -587,6 +587,10 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
return
code
;
}
static
bool
isInterpFunc
(
int32_t
funcId
)
{
return
fmIsInterpFunc
(
funcId
)
||
fmIsInterpPseudoColumnFunc
(
funcId
)
||
fmIsGroupKeyFunc
(
funcId
);
}
static
int32_t
createInterpFuncLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SLogicNode
**
pLogicNode
)
{
if
(
!
pSelect
->
hasInterpFunc
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -602,7 +606,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
pInterpFunc
->
node
.
resultDataOrder
=
pInterpFunc
->
node
.
requireDataOrder
;
// interp functions and _group_key functions
int32_t
code
=
nodesCollectFuncs
(
pSelect
,
SQL_CLAUSE_SELECT
,
fmIsVector
Func
,
&
pInterpFunc
->
pFuncs
);
int32_t
code
=
nodesCollectFuncs
(
pSelect
,
SQL_CLAUSE_SELECT
,
isInterp
Func
,
&
pInterpFunc
->
pFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteExprsForSelect
(
pInterpFunc
->
pFuncs
,
pSelect
,
SQL_CLAUSE_SELECT
);
}
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
92804d31
...
...
@@ -456,6 +456,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
(
*
pDst
)
->
vgHash
=
taosHashInit
(
taosHashGetSize
(
pSrc
->
vgHash
),
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
(
*
pDst
)
->
vgHash
)
{
taosMemoryFreeClear
(
*
pDst
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
source/libs/scalar/src/filter.c
浏览文件 @
92804d31
...
...
@@ -1087,7 +1087,7 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left,
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
info
->
units
=
tmp
;
info
->
units
=
(
SFilterUnit
*
)
tmp
;
memset
(
info
->
units
+
psize
,
0
,
sizeof
(
*
info
->
units
)
*
FILTER_DEFAULT_UNIT_SIZE
);
}
...
...
@@ -1633,12 +1633,12 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
SValueNode
*
var
=
(
SValueNode
*
)
field
->
desc
;
SDataType
*
dType
=
&
var
->
node
.
resType
;
if
(
dType
->
type
==
TSDB_DATA_TYPE_VALUE_ARRAY
)
{
qDebug
(
"VAL%d => [type:TS][val:[%"
PRIi64
"] - [%"
PRId64
"]]"
,
i
,
*
(
int64_t
*
)
field
->
data
,
*
(((
int64_t
*
)
field
->
data
)
+
1
));
}
else
{
//
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) {
//
qDebug("VAL%d => [type:TS][val:[%" PRIi64 "] - [%" PRId64 "]]", i, *(int64_t *)field->data,
//
*(((int64_t *)field->data) + 1));
//
} else {
qDebug
(
"VAL%d => [type:%d][val:%"
PRIx64
"]"
,
i
,
dType
->
type
,
var
->
datum
.
i
);
// TODO
}
//
}
}
else
if
(
field
->
data
)
{
qDebug
(
"VAL%d => [type:NIL][val:NIL]"
,
i
);
// TODO
}
...
...
@@ -4059,11 +4059,13 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
SArray
*
pList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
taosArrayPush
(
pList
,
&
pSrc
);
FLT_ERR_RET
(
scalarCalculate
(
info
->
sclCtx
.
node
,
pList
,
&
output
));
*
p
=
output
.
columnData
;
int32_t
code
=
scalarCalculate
(
info
->
sclCtx
.
node
,
pList
,
&
output
);
taosArrayDestroy
(
pList
);
FLT_ERR_RET
(
code
);
*
p
=
output
.
columnData
;
if
(
output
.
numOfQualified
==
output
.
numOfRows
)
{
*
pResultStatus
=
FILTER_RESULT_ALL_QUALIFIED
;
}
else
if
(
output
.
numOfQualified
==
0
)
{
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
92804d31
...
...
@@ -896,6 +896,10 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
SCL_ERR_JRET
(
sclGetNodeRes
(
pWhenThen
->
pWhen
,
ctx
,
&
pWhen
));
SCL_ERR_JRET
(
sclGetNodeRes
(
pWhenThen
->
pThen
,
ctx
,
&
pThen
));
if
(
NULL
==
pWhen
||
NULL
==
pThen
)
{
sclError
(
"invalid when/then in whenThen list"
);
SCL_ERR_JRET
(
TSDB_CODE_INVALID_PARA
);
}
if
(
pCase
)
{
vectorCompare
(
pCase
,
pWhen
,
&
comp
,
TSDB_ORDER_ASC
,
OP_TYPE_EQUAL
);
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
92804d31
...
...
@@ -288,6 +288,33 @@ void walAlignVersions(SWal* pWal) {
pWal
->
vers
.
appliedVer
=
TMIN
(
pWal
->
vers
.
commitVer
,
pWal
->
vers
.
appliedVer
);
}
bool
walLogEntriesComplete
(
const
SWal
*
pWal
)
{
int32_t
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
bool
complete
=
true
;
int32_t
fileIdx
=
-
1
;
int64_t
index
=
pWal
->
vers
.
firstVer
;
while
(
++
fileIdx
<
sz
)
{
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
fileIdx
);
if
(
pFileInfo
->
firstVer
!=
index
)
{
break
;
}
index
=
pFileInfo
->
lastVer
+
((
fileIdx
+
1
<
sz
)
?
1
:
0
);
}
// empty is regarded as complete
if
(
sz
!=
0
)
{
complete
=
(
index
==
pWal
->
vers
.
lastVer
);
}
if
(
!
complete
)
{
wError
(
"vgId:%d, WAL log entries incomplete in range [%"
PRId64
", %"
PRId64
"], aligned with snaphotVer:%"
PRId64
,
pWal
->
cfg
.
vgId
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
,
pWal
->
vers
.
snapshotVer
);
terrno
=
TSDB_CODE_WAL_LOG_INCOMPLETE
;
}
return
complete
;
}
int
walCheckAndRepairMeta
(
SWal
*
pWal
)
{
// load log files, get first/snapshot/last version info
const
char
*
logPattern
=
"^[0-9]+.log$"
;
...
...
@@ -401,6 +428,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
if
(
updateMeta
)
{
(
void
)
walSaveMeta
(
pWal
);
}
if
(
!
walLogEntriesComplete
(
pWal
))
{
return
-
1
;
}
return
0
;
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
92804d31
...
...
@@ -101,7 +101,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
taosThreadMutexLock
(
&
pWal
->
mutex
);
int64_t
code
;
char
fnameStr
[
WAL_FILE_LEN
];
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
commitVer
)
{
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
commitVer
||
ver
<=
pWal
->
vers
.
snapshotVer
)
{
terrno
=
TSDB_CODE_WAL_INVALID_VER
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
...
...
source/libs/wal/test/walMetaTest.cpp
浏览文件 @
92804d31
...
...
@@ -274,17 +274,17 @@ TEST_F(WalCleanEnv, rollbackMultiFile) {
code
=
walRollback
(
pWal
,
9
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
8
);
code
=
walRollback
(
pWal
,
5
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
4
);
code
=
walRollback
(
pWal
,
3
);
code
=
walRollback
(
pWal
,
6
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
5
);
code
=
walRollback
(
pWal
,
5
);
ASSERT_EQ
(
code
,
-
1
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
2
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
5
);
code
=
walWrite
(
pWal
,
3
,
3
,
(
void
*
)
ranStr
,
ranStrLen
);
code
=
walWrite
(
pWal
,
6
,
6
,
(
void
*
)
ranStr
,
ranStrLen
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
3
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
6
);
code
=
walSaveMeta
(
pWal
);
ASSERT_EQ
(
code
,
0
);
...
...
source/util/src/terror.c
浏览文件 @
92804d31
...
...
@@ -452,6 +452,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid versi
TAOS_DEFINE_ERROR
(
TSDB_CODE_WAL_OUT_OF_MEMORY
,
"WAL out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_WAL_LOG_NOT_EXIST
,
"WAL log not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_WAL_CHKSUM_MISMATCH
,
"WAL checksum mismatch"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_WAL_LOG_INCOMPLETE
,
"WAL log incomplete"
)
// tfs
TAOS_DEFINE_ERROR
(
TSDB_CODE_FS_INVLD_CFG
,
"tfs invalid mount config"
)
...
...
tests/script/jenkins/basic.txt
浏览文件 @
92804d31
...
...
@@ -322,7 +322,7 @@
# --- vnode ----
./test.sh -f tsim/vnode/replica3_basic.sim
./test.sh -f tsim/vnode/replica3_repeat.sim
# TD-20089
./test.sh -f tsim/vnode/replica3_repeat.sim
./test.sh -f tsim/vnode/replica3_vgroup.sim
./test.sh -f tsim/vnode/replica3_many.sim
./test.sh -f tsim/vnode/replica3_import.sim
...
...
tests/system-test/7-tmq/tmqDnodeRestart1.py
浏览文件 @
92804d31
...
...
@@ -21,7 +21,7 @@ class TDTestCase:
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
1000
def
init
(
self
,
conn
,
logSql
):
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
...
...
tools/shell/CMakeLists.txt
浏览文件 @
92804d31
...
...
@@ -2,6 +2,14 @@ aux_source_directory(src SHELL_SRC)
add_executable
(
shell
${
SHELL_SRC
}
)
IF
(
TD_LINUX_64 AND JEMALLOC_ENABLED
)
ADD_DEFINITIONS
(
-DTD_JEMALLOC_ENABLED -I
${
CMAKE_BINARY_DIR
}
/build/include -L
${
CMAKE_BINARY_DIR
}
/build/lib -Wl,-rpath,
${
CMAKE_BINARY_DIR
}
/build/lib -ljemalloc
)
SET
(
LINK_JEMALLOC
"-L
${
CMAKE_BINARY_DIR
}
/build/lib -ljemalloc"
)
ADD_DEPENDENCIES
(
shell jemalloc
)
ELSE
()
SET
(
LINK_JEMALLOC
""
)
ENDIF
()
IF
(
TD_LINUX AND TD_WEBSOCKET
)
ADD_DEFINITIONS
(
-DWEBSOCKET -I
${
CMAKE_BINARY_DIR
}
/build/include -ltaosws
)
SET
(
LINK_WEBSOCKET
"-L
${
CMAKE_BINARY_DIR
}
/build/lib -ltaosws"
)
...
...
@@ -21,7 +29,7 @@ ENDIF ()
if
(
TD_WINDOWS
)
target_link_libraries
(
shell PUBLIC taos_static
${
LINK_WEBSOCKET
}
)
else
()
target_link_libraries
(
shell PUBLIC taos
${
LINK_WEBSOCKET
}
)
target_link_libraries
(
shell PUBLIC taos
${
LINK_WEBSOCKET
}
${
LINK_JEMALLOC
}
)
endif
()
target_link_libraries
(
shell
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录