Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d56dc950
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d56dc950
编写于
11月 10, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(stream): partition tbname reset to null
上级
4000176d
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
31 addition
and
28 deletion
+31
-28
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+4
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+27
-27
未找到文件。
source/libs/executor/src/groupoperator.c
浏览文件 @
d56dc950
...
...
@@ -909,7 +909,10 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
void
*
pData
=
colDataGetVarData
(
pCol
,
0
);
// TODO check tbname validity
if
(
pData
!=
(
void
*
)
-
1
)
{
memcpy
(
pDest
->
info
.
parTbName
,
varDataVal
(
pData
),
varDataLen
(
pData
));
memset
(
pDest
->
info
.
parTbName
,
0
,
TSDB_TABLE_NAME_LEN
);
int32_t
len
=
TMIN
(
varDataLen
(
pData
),
TSDB_TABLE_NAME_LEN
);
memcpy
(
pDest
->
info
.
parTbName
,
varDataVal
(
pData
),
len
);
/*pDest->info.parTbName[len + 1] = 0;*/
}
else
{
pDest
->
info
.
parTbName
[
0
]
=
0
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
d56dc950
...
...
@@ -284,7 +284,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
}
static
bool
doFilterByBlockSMA
(
SFilterInfo
*
pFilterInfo
,
SColumnDataAgg
**
pColsAgg
,
int32_t
numOfCols
,
int32_t
numOfRows
)
{
int32_t
numOfRows
)
{
if
(
pColsAgg
==
NULL
||
pFilterInfo
==
NULL
)
{
return
true
;
}
...
...
@@ -345,7 +345,7 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo
// todo handle the slimit info
void
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
)
{
SLimit
*
pLimit
=
&
pLimitInfo
->
limit
;
SLimit
*
pLimit
=
&
pLimitInfo
->
limit
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
if
(
pLimit
->
offset
>
0
&&
pLimitInfo
->
remainOffset
>
0
)
{
...
...
@@ -499,7 +499,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
typedef
struct
STableCachedVal
{
const
char
*
pName
;
STag
*
pTags
;
STag
*
pTags
;
}
STableCachedVal
;
static
void
freeTableCachedVal
(
void
*
param
)
{
...
...
@@ -513,13 +513,11 @@ static void freeTableCachedVal(void* param) {
taosMemoryFree
(
pVal
);
}
//const void *key, size_t keyLen, void *value
static
void
freeCachedMetaItem
(
const
void
*
key
,
size_t
keyLen
,
void
*
value
)
{
freeTableCachedVal
(
value
);
}
// const void *key, size_t keyLen, void *value
static
void
freeCachedMetaItem
(
const
void
*
key
,
size_t
keyLen
,
void
*
value
)
{
freeTableCachedVal
(
value
);
}
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
const
SExprInfo
*
pExpr
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
,
STableMetaCacheInfo
*
pCache
)
{
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
const
SExprInfo
*
pExpr
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
,
STableMetaCacheInfo
*
pCache
)
{
// currently only the tbname pseudo column
if
(
numOfExpr
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -531,11 +529,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
int32_t
backupRows
=
pBlock
->
info
.
rows
;
pBlock
->
info
.
rows
=
rows
;
bool
freeReader
=
false
;
bool
freeReader
=
false
;
STableCachedVal
val
=
{
0
};
SMetaReader
mr
=
{
0
};
LRUHandle
*
h
=
NULL
;
LRUHandle
*
h
=
NULL
;
// 1. check if it is existed in meta cache
if
(
pCache
==
NULL
)
{
...
...
@@ -582,7 +580,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
val
=
*
pVal
;
freeReader
=
true
;
int32_t
ret
=
taosLRUCacheInsert
(
pCache
->
pTableMetaEntryCache
,
&
pBlock
->
info
.
uid
,
sizeof
(
uint64_t
),
pVal
,
sizeof
(
STableCachedVal
),
freeCachedMetaItem
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
int32_t
ret
=
taosLRUCacheInsert
(
pCache
->
pTableMetaEntryCache
,
&
pBlock
->
info
.
uid
,
sizeof
(
uint64_t
),
pVal
,
sizeof
(
STableCachedVal
),
freeCachedMetaItem
,
NULL
,
TAOS_LRU_PRIORITY_LOW
);
if
(
ret
!=
TAOS_LRU_STATUS_OK
)
{
qError
(
"failed to put meta into lru cache, code:%d, %s"
,
ret
,
idStr
);
freeTableCachedVal
(
pVal
);
...
...
@@ -594,13 +593,13 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
taosLRUCacheRelease
(
pCache
->
pTableMetaEntryCache
,
h
,
false
);
}
qDebug
(
"retrieve table meta from cache:%"
PRIu64
", hit:%"
PRIu64
" miss:%"
PRIu64
", %s"
,
pCache
->
metaFetch
,
pCache
->
cacheHit
,
(
pCache
->
metaFetch
-
pCache
->
cacheHit
),
idStr
);
qDebug
(
"retrieve table meta from cache:%"
PRIu64
", hit:%"
PRIu64
" miss:%"
PRIu64
", %s"
,
pCache
->
metaFetch
,
pCache
->
cacheHit
,
(
pCache
->
metaFetch
-
pCache
->
cacheHit
),
idStr
);
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
const
SExprInfo
*
pExpr1
=
&
pExpr
[
j
];
int32_t
dstSlotId
=
pExpr1
->
base
.
resSchema
.
slotId
;
int32_t
dstSlotId
=
pExpr1
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
dstSlotId
);
colInfoDataCleanup
(
pColInfoData
,
pBlock
->
info
.
rows
);
...
...
@@ -652,7 +651,7 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
fmGetScalarFuncExecFuncs
(
functionId
,
&
fpSet
);
size_t
len
=
TSDB_TABLE_FNAME_LEN
+
VARSTR_HEADER_SIZE
;
char
buf
[
TSDB_TABLE_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
buf
[
TSDB_TABLE_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_TO_VARSTR
(
buf
,
name
)
SColumnInfoData
infoData
=
createColumnInfoData
(
TSDB_DATA_TYPE_VARCHAR
,
len
,
1
);
...
...
@@ -904,12 +903,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto
_error
;
}
SScanPhysiNode
*
pScanNode
=
&
pTableScanNode
->
scan
;
SScanPhysiNode
*
pScanNode
=
&
pTableScanNode
->
scan
;
SDataBlockDescNode
*
pDescNode
=
pScanNode
->
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
int32_t
code
=
extractColMatchInfo
(
pScanNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
,
&
pInfo
->
matchInfo
);
int32_t
code
=
extractColMatchInfo
(
pScanNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
,
&
pInfo
->
matchInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -955,7 +954,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
metaCache
.
pTableMetaEntryCache
=
taosLRUCacheInit
(
1024
*
128
,
-
1
,
.
5
);
pInfo
->
metaCache
.
pTableMetaEntryCache
=
taosLRUCacheInit
(
1024
*
128
,
-
1
,
.
5
);
if
(
pInfo
->
metaCache
.
pTableMetaEntryCache
==
NULL
)
{
code
=
terrno
;
goto
_error
;
...
...
@@ -1049,8 +1048,8 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableBlockDistInfo
blockDistInfo
=
{.
minRows
=
INT_MAX
,
.
maxRows
=
INT_MIN
};
int32_t
code
=
doGetTableRowSize
(
pBlockScanInfo
->
readHandle
.
meta
,
pBlockScanInfo
->
uid
,
(
int32_t
*
)
&
blockDistInfo
.
rowSize
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
doGetTableRowSize
(
pBlockScanInfo
->
readHandle
.
meta
,
pBlockScanInfo
->
uid
,
(
int32_t
*
)
&
blockDistInfo
.
rowSize
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -1599,8 +1598,10 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
void
*
pData
=
colDataGetData
(
pCol
,
0
);
// TODO check tbname validation
if
(
pData
!=
(
void
*
)
-
1
&&
pData
!=
NULL
)
{
memcpy
(
pBlock
->
info
.
parTbName
,
varDataVal
(
pData
),
TMIN
(
varDataLen
(
pData
),
TSDB_TABLE_NAME_LEN
));
pBlock
->
info
.
parTbName
[
TSDB_TABLE_NAME_LEN
-
1
]
=
0
;
memset
(
pBlock
->
info
.
parTbName
,
0
,
TSDB_TABLE_NAME_LEN
);
int32_t
len
=
TMIN
(
varDataLen
(
pData
),
TSDB_TABLE_NAME_LEN
);
memcpy
(
pBlock
->
info
.
parTbName
,
varDataVal
(
pData
),
len
);
/*pBlock->info.parTbName[len + 1] = 0;*/
}
else
{
pBlock
->
info
.
parTbName
[
0
]
=
0
;
}
...
...
@@ -4319,7 +4320,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
int32_t
numOfExprs
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPhyNode
->
pScanPseudoCols
,
NULL
,
&
numOfExprs
);
int32_t
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
numOfExprs
);
int32_t
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
numOfExprs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -4455,7 +4456,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
if
(
pOperator
->
exprSupp
.
pFilterInfo
!=
NULL
)
{
if
(
pOperator
->
exprSupp
.
pFilterInfo
!=
NULL
)
{
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pBlock
,
pOperator
->
exprSupp
.
pFilterInfo
,
&
pTableScanInfo
->
matchInfo
);
...
...
@@ -4831,7 +4832,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo
->
sample
.
seed
=
taosGetTimestampSec
();
pInfo
->
dataBlockLoadFlag
=
pTableScanNode
->
dataRequired
;
code
=
filterInitFromNode
((
SNode
*
)
pTableScanNode
->
scan
.
node
.
pConditions
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录