Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
35adca61
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
35adca61
编写于
1月 06, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-11818]add error check for hash.
上级
29fe08d1
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
60 addition
and
39 deletion
+60
-39
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+8
-1
source/libs/parser/src/dCDAstProcess.c
source/libs/parser/src/dCDAstProcess.c
+39
-25
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+2
-2
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+11
-11
未找到文件。
source/libs/catalog/src/catalog.c
浏览文件 @
35adca61
...
...
@@ -311,6 +311,14 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
if
(
NULL
==
vgInfo
)
{
ctgError
(
"no hash range found for hashvalue[%u]"
,
hashValue
);
void
*
pIter1
=
taosHashIterate
(
dbInfo
->
vgInfo
,
NULL
);
while
(
pIter1
)
{
vgInfo
=
pIter1
;
ctgDebug
(
"valid range:[%d, %d]"
,
vgInfo
->
hashBegin
,
vgInfo
->
hashEnd
);
pIter1
=
taosHashIterate
(
dbInfo
->
vgInfo
,
pIter1
);
}
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
...
...
@@ -773,7 +781,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
dbInfo
,
pTableName
,
pVgroup
));
_return:
if
(
dbInfo
)
{
CTG_UNLOCK
(
CTG_READ
,
&
dbInfo
->
lock
);
taosHashRelease
(
pCatalog
->
dbCache
.
cache
,
dbInfo
);
...
...
source/libs/parser/src/dCDAstProcess.c
浏览文件 @
35adca61
...
...
@@ -339,7 +339,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT
code
=
parseValueToken
(
&
endPtr
,
pItem
,
pSchema
,
tsPrecision
,
tmpTokenBuf
,
KvRowAppend
,
&
param
,
pMsgBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
pKvRowBuilder
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
}
...
...
@@ -393,6 +392,9 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
const
char
*
msg3
=
"tag value too long"
;
const
char
*
msg4
=
"illegal value or data overflow"
;
int32_t
code
=
0
;
STableMeta
*
pSuperTableMeta
=
NULL
;
SHashObj
*
pVgroupHashmap
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
// super table name, create table by using dst
...
...
@@ -401,29 +403,30 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
SCreatedTableInfo
*
pCreateTableInfo
=
taosArrayGet
(
pCreateTable
->
childTableInfo
,
j
);
SToken
*
pSTableNameToken
=
&
pCreateTableInfo
->
stbName
;
int32_t
code
=
parserValidateNameToken
(
pSTableNameToken
);
code
=
parserValidateNameToken
(
pSTableNameToken
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
goto
_error
;
}
SName
name
=
{
0
};
code
=
createSName
(
&
name
,
pSTableNameToken
,
pCtx
,
pMsgBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
_error
;
}
SKVRowBuilder
kvRowBuilder
=
{
0
};
if
(
tdInitKVRowBuilder
(
&
kvRowBuilder
)
<
0
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_error
;
}
SArray
*
pValList
=
pCreateTableInfo
->
pTagVals
;
size_t
numOfInputTag
=
taosArrayGetSize
(
pValList
);
STableMeta
*
pSuperTableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
&
name
,
&
pSuperTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
_error
;
}
assert
(
pSuperTableMeta
!=
NULL
);
...
...
@@ -442,8 +445,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
if
(
numOfInputTag
!=
numOfBoundTags
||
schemaSize
<
numOfInputTag
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
);
goto
_error
;
}
bool
findColumnIndex
=
false
;
...
...
@@ -475,8 +478,8 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_BINARY
||
pSchema
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
pItem
->
pVar
.
nLen
>
pSchema
->
bytes
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
);
goto
_error
;
}
}
else
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
if
(
pItem
->
pVar
.
nType
==
TSDB_DATA_TYPE_BINARY
)
{
...
...
@@ -492,19 +495,19 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
code
=
taosVariantDump
(
&
(
pItem
->
pVar
),
tagVal
,
pSchema
->
type
,
true
);
// check again after the convert since it may be converted from binary to nchar.
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_BINARY
||
pSchema
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
IS_VAR_DATA_TYPE
(
pSchema
->
type
)
)
{
int16_t
len
=
varDataTLen
(
tagVal
);
if
(
len
>
pSchema
->
bytes
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
);
goto
_error
;
}
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg4
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg4
);
goto
_error
;
}
tdAddColToKVRow
(
&
kvRowBuilder
,
pSchema
->
colId
,
pSchema
->
type
,
tagVal
);
...
...
@@ -522,23 +525,22 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
}
else
{
if
(
schemaSize
!=
numOfInputTag
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
)
;
code
=
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
);
goto
_error
;
}
code
=
doParseSerializeTagValue
(
pTagSchema
,
numOfInputTag
,
&
kvRowBuilder
,
pValList
,
tinfo
.
precision
,
pMsgBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
tfree
(
pSuperTableMeta
);
return
code
;
goto
_error
;
}
}
SKVRow
row
=
tdGetKVRowFromBuilder
(
&
kvRowBuilder
);
tdDestroyKVRowBuilder
(
&
kvRowBuilder
);
if
(
row
==
NULL
)
{
tfree
(
pSuperTableMeta
)
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_error
;
}
tdSortKVRowByColIdx
(
row
);
...
...
@@ -546,22 +548,34 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
SName
tableName
=
{
0
};
code
=
createSName
(
&
tableName
,
&
pCreateTableInfo
->
name
,
pCtx
,
pMsgBuf
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
pSuperTableMeta
);
return
code
;
goto
_error
;
}
// Find a appropriate vgroup to accommodate this table , according to the table name
SVgroupInfo
info
=
{
0
};
catalogGetTableHashVgroup
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
&
tableName
,
&
info
);
code
=
catalogGetTableHashVgroup
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
&
tableName
,
&
info
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
addCreateTbReqIntoVgroup
(
pVgroupHashmap
,
&
tableName
,
row
,
pSuperTableMeta
->
uid
,
&
info
);
tfree
(
pSuperTableMeta
);
}
*
pBufArray
=
doSerializeVgroupCreateTableInfo
(
pVgroupHashmap
);
if
(
*
pBufArray
==
NULL
)
{
code
=
terrno
;
goto
_error
;
}
taosHashCleanup
(
pVgroupHashmap
);
return
TSDB_CODE_SUCCESS
;
_error:
taosHashCleanup
(
pVgroupHashmap
);
tfree
(
pSuperTableMeta
);
terrno
=
code
;
return
code
;
}
static
int32_t
serializeVgroupTablesBatchImpl
(
SVgroupTablesBatch
*
pTbBatch
,
SArray
*
pBufArray
)
{
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
35adca61
...
...
@@ -67,8 +67,8 @@ typedef struct SSchTask {
int32_t
msgLen
;
// msg length
int8_t
status
;
// task status
SQueryNodeAddr
execAddr
;
// task actual executed node address
int8_t
c
o
ndidateIdx
;
// current try condidation index
SArray
*
c
o
ndidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
int8_t
c
a
ndidateIdx
;
// current try condidation index
SArray
*
c
a
ndidateAddrs
;
// condidate node addresses, element is SQueryNodeAddr
SQueryProfileSummary
summary
;
// task execution summary
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
35adca61
...
...
@@ -109,7 +109,7 @@ static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) {
}
static
void
cleanupTask
(
SSchTask
*
pTask
)
{
taosArrayDestroy
(
pTask
->
c
o
ndidateAddrs
);
taosArrayDestroy
(
pTask
->
c
a
ndidateAddrs
);
}
int32_t
schValidateAndBuildJob
(
SQueryDag
*
dag
,
SSchJob
*
pJob
)
{
...
...
@@ -226,20 +226,20 @@ _return:
SCH_RET
(
code
);
}
int32_t
schSetTaskC
o
ndidateAddrs
(
SSchJob
*
job
,
SSchTask
*
task
)
{
if
(
task
->
c
o
ndidateAddrs
)
{
int32_t
schSetTaskC
a
ndidateAddrs
(
SSchJob
*
job
,
SSchTask
*
task
)
{
if
(
task
->
c
a
ndidateAddrs
)
{
return
TSDB_CODE_SUCCESS
;
}
task
->
c
o
ndidateIdx
=
0
;
task
->
c
o
ndidateAddrs
=
taosArrayInit
(
SCH_MAX_CONDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
task
->
c
o
ndidateAddrs
)
{
task
->
c
a
ndidateIdx
=
0
;
task
->
c
a
ndidateAddrs
=
taosArrayInit
(
SCH_MAX_CONDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
task
->
c
a
ndidateAddrs
)
{
qError
(
"taosArrayInit failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
task
->
plan
->
execNode
.
numOfEps
>
0
)
{
if
(
NULL
==
taosArrayPush
(
task
->
c
o
ndidateAddrs
,
&
task
->
plan
->
execNode
))
{
if
(
NULL
==
taosArrayPush
(
task
->
c
a
ndidateAddrs
,
&
task
->
plan
->
execNode
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -253,7 +253,7 @@ int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) {
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_CONDIDATE_EP_NUM
;
++
i
)
{
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
job
->
nodeList
,
i
);
if
(
NULL
==
taosArrayPush
(
task
->
c
o
ndidateAddrs
,
&
task
->
plan
->
execNode
))
{
if
(
NULL
==
taosArrayPush
(
task
->
c
a
ndidateAddrs
,
&
task
->
plan
->
execNode
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -798,7 +798,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
}
SEpSet
epSet
;
SQueryNodeAddr
*
addr
=
taosArrayGet
(
task
->
c
ondidateAddrs
,
task
->
co
ndidateIdx
);
SQueryNodeAddr
*
addr
=
taosArrayGet
(
task
->
c
andidateAddrs
,
task
->
ca
ndidateIdx
);
schConvertAddrToEpSet
(
addr
,
&
epSet
);
...
...
@@ -816,9 +816,9 @@ _return:
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
)
{
SSubplan
*
plan
=
task
->
plan
;
SCH_ERR_RET
(
qSubPlanToString
(
plan
,
&
task
->
msg
,
&
task
->
msgLen
));
SCH_ERR_RET
(
schSetTaskC
o
ndidateAddrs
(
job
,
task
));
SCH_ERR_RET
(
schSetTaskC
a
ndidateAddrs
(
job
,
task
));
if
(
NULL
==
task
->
c
ondidateAddrs
||
taosArrayGetSize
(
task
->
co
ndidateAddrs
)
<=
0
)
{
if
(
NULL
==
task
->
c
andidateAddrs
||
taosArrayGetSize
(
task
->
ca
ndidateAddrs
)
<=
0
)
{
SCH_TASK_ERR_LOG
(
"no valid condidate node for task:%"
PRIx64
,
task
->
taskId
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录