Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a1e1c089
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a1e1c089
编写于
4月 29, 2022
作者:
X
Xiaoyu Wang
提交者:
GitHub
4月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12049 from taosdata/feature/vnode_refact1_droptable_wxy
feat: sql command 'drop table'
上级
18a12ae3
8818ed48
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
214 addition
and
29 deletion
+214
-29
include/common/tmsg.h
include/common/tmsg.h
+1
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+155
-16
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+2
-0
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+10
-3
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+43
-10
未找到文件。
include/common/tmsg.h
浏览文件 @
a1e1c089
...
...
@@ -1605,6 +1605,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc
// TDMT_VND_DROP_TABLE =================
typedef
struct
{
const
char
*
name
;
int8_t
igNotExists
;
}
SVDropTbReq
;
typedef
struct
{
...
...
include/util/taoserror.h
浏览文件 @
a1e1c089
...
...
@@ -616,6 +616,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639)
#define TSDB_CODE_PAR_INVALID_DROP_STABLE TAOS_DEF_ERROR_CODE(0, 0x263A)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
...
...
source/common/src/tmsg.c
浏览文件 @
a1e1c089
...
...
@@ -3800,6 +3800,7 @@ static int32_t tEncodeSVDropTbReq(SCoder *pCoder, const SVDropTbReq *pReq) {
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndEncode
(
pCoder
);
return
0
;
...
...
@@ -3809,6 +3810,7 @@ static int32_t tDecodeSVDropTbReq(SCoder *pCoder, SVDropTbReq *pReq) {
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStr
(
pCoder
,
&
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pReq
->
igNotExists
)
<
0
)
return
-
1
;
tEndDecode
(
pCoder
);
return
0
;
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
a1e1c089
...
...
@@ -3134,11 +3134,11 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
return
code
;
}
typedef
struct
SVgroup
Tables
Batch
{
typedef
struct
SVgroup
CreateTable
Batch
{
SVCreateTbBatchReq
req
;
SVgroupInfo
info
;
char
dbName
[
TSDB_DB_NAME_LEN
];
}
SVgroup
Tables
Batch
;
}
SVgroup
CreateTable
Batch
;
static
void
destroyCreateTbReq
(
SVCreateTbReq
*
pReq
)
{
taosMemoryFreeClear
(
pReq
->
name
);
...
...
@@ -3146,7 +3146,7 @@ static void destroyCreateTbReq(SVCreateTbReq* pReq) {
}
static
int32_t
buildNormalTableBatchReq
(
int32_t
acctId
,
const
SCreateTableStmt
*
pStmt
,
const
SVgroupInfo
*
pVgroupInfo
,
SVgroup
Tables
Batch
*
pBatch
)
{
SVgroup
CreateTable
Batch
*
pBatch
)
{
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
SName
name
=
{.
type
=
TSDB_DB_NAME_T
,
.
acctId
=
acctId
};
strcpy
(
name
.
dbname
,
pStmt
->
dbName
);
...
...
@@ -3180,13 +3180,13 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
serializeVgroup
TablesBatch
(
SVgroupTables
Batch
*
pTbBatch
,
SArray
*
pBufArray
)
{
static
int32_t
serializeVgroup
CreateTableBatch
(
SVgroupCreateTable
Batch
*
pTbBatch
,
SArray
*
pBufArray
)
{
int
tlen
;
SCoder
coder
=
{
0
};
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVCreateTbBatchReq
,
&
pTbBatch
->
req
,
tlen
,
ret
);
tlen
+=
sizeof
(
SMsgHead
);
//+ tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
tlen
+=
sizeof
(
SMsgHead
);
void
*
buf
=
taosMemoryMalloc
(
tlen
);
if
(
NULL
==
buf
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -3212,7 +3212,7 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray*
return
TSDB_CODE_SUCCESS
;
}
static
void
destroyCreateTbReqBatch
(
SVgroup
Tables
Batch
*
pTbBatch
)
{
static
void
destroyCreateTbReqBatch
(
SVgroup
CreateTable
Batch
*
pTbBatch
)
{
size_t
size
=
taosArrayGetSize
(
pTbBatch
->
req
.
pArray
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SVCreateTbReq
*
pTableReq
=
taosArrayGet
(
pTbBatch
->
req
.
pArray
,
i
);
...
...
@@ -3257,10 +3257,10 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SVgroup
Tables
Batch
tbatch
=
{
0
};
int32_t
code
=
buildNormalTableBatchReq
(
acctId
,
pStmt
,
pInfo
,
&
tbatch
);
SVgroup
CreateTable
Batch
tbatch
=
{
0
};
int32_t
code
=
buildNormalTableBatchReq
(
acctId
,
pStmt
,
pInfo
,
&
tbatch
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
serializeVgroup
Tables
Batch
(
&
tbatch
,
*
pBufArray
);
code
=
serializeVgroup
CreateTable
Batch
(
&
tbatch
,
*
pBufArray
);
}
destroyCreateTbReqBatch
(
&
tbatch
);
...
...
@@ -3305,9 +3305,9 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
req
.
ctb
.
suid
=
suid
;
req
.
ctb
.
pTag
=
row
;
SVgroup
Tables
Batch
*
pTableBatch
=
taosHashGet
(
pVgroupHashmap
,
&
pVgInfo
->
vgId
,
sizeof
(
pVgInfo
->
vgId
));
SVgroup
CreateTable
Batch
*
pTableBatch
=
taosHashGet
(
pVgroupHashmap
,
&
pVgInfo
->
vgId
,
sizeof
(
pVgInfo
->
vgId
));
if
(
pTableBatch
==
NULL
)
{
SVgroup
Tables
Batch
tBatch
=
{
0
};
SVgroup
CreateTable
Batch
tBatch
=
{
0
};
tBatch
.
info
=
*
pVgInfo
;
strcpy
(
tBatch
.
dbName
,
pDbName
);
...
...
@@ -3480,21 +3480,21 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
return
code
;
}
static
SArray
*
serializeVgroups
Tables
Batch
(
int32_t
acctId
,
SHashObj
*
pVgroupHashmap
)
{
static
SArray
*
serializeVgroups
CreateTable
Batch
(
int32_t
acctId
,
SHashObj
*
pVgroupHashmap
)
{
SArray
*
pBufArray
=
taosArrayInit
(
taosHashGetSize
(
pVgroupHashmap
),
sizeof
(
void
*
));
if
(
NULL
==
pBufArray
)
{
return
NULL
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SVgroup
Tables
Batch
*
pTbBatch
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SVgroup
CreateTable
Batch
*
pTbBatch
=
NULL
;
do
{
pTbBatch
=
taosHashIterate
(
pVgroupHashmap
,
pTbBatch
);
if
(
pTbBatch
==
NULL
)
{
break
;
}
serializeVgroup
Tables
Batch
(
pTbBatch
,
pBufArray
);
serializeVgroup
CreateTable
Batch
(
pTbBatch
,
pBufArray
);
destroyCreateTbReqBatch
(
pTbBatch
);
}
while
(
true
);
...
...
@@ -3519,7 +3519,143 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
}
}
SArray
*
pBufArray
=
serializeVgroupsTablesBatch
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
);
SArray
*
pBufArray
=
serializeVgroupsCreateTableBatch
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
);
taosHashCleanup
(
pVgroupHashmap
);
if
(
NULL
==
pBufArray
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
rewriteToVnodeModifOpStmt
(
pQuery
,
pBufArray
);
}
typedef
struct
SVgroupDropTableBatch
{
SVDropTbBatchReq
req
;
SVgroupInfo
info
;
char
dbName
[
TSDB_DB_NAME_LEN
];
}
SVgroupDropTableBatch
;
static
void
addDropTbReqIntoVgroup
(
SHashObj
*
pVgroupHashmap
,
SDropTableClause
*
pClause
,
SVgroupInfo
*
pVgInfo
)
{
SVDropTbReq
req
=
{.
name
=
pClause
->
tableName
,
.
igNotExists
=
pClause
->
ignoreNotExists
};
SVgroupDropTableBatch
*
pTableBatch
=
taosHashGet
(
pVgroupHashmap
,
&
pVgInfo
->
vgId
,
sizeof
(
pVgInfo
->
vgId
));
if
(
NULL
==
pTableBatch
)
{
SVgroupDropTableBatch
tBatch
=
{
0
};
tBatch
.
info
=
*
pVgInfo
;
tBatch
.
req
.
pArray
=
taosArrayInit
(
TARRAY_MIN_SIZE
,
sizeof
(
SVDropTbReq
));
taosArrayPush
(
tBatch
.
req
.
pArray
,
&
req
);
taosHashPut
(
pVgroupHashmap
,
&
pVgInfo
->
vgId
,
sizeof
(
pVgInfo
->
vgId
),
&
tBatch
,
sizeof
(
tBatch
));
}
else
{
// add to the correct vgroup
taosArrayPush
(
pTableBatch
->
req
.
pArray
,
&
req
);
}
}
static
int32_t
buildDropTableVgroupHashmap
(
STranslateContext
*
pCxt
,
SDropTableClause
*
pClause
,
bool
*
pIsSuperTable
,
SHashObj
*
pVgroupHashmap
)
{
STableMeta
*
pTableMeta
=
NULL
;
int32_t
code
=
getTableMeta
(
pCxt
,
pClause
->
dbName
,
pClause
->
tableName
,
&
pTableMeta
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
TSDB_SUPER_TABLE
==
pTableMeta
->
tableType
)
{
*
pIsSuperTable
=
true
;
goto
over
;
}
*
pIsSuperTable
=
false
;
SVgroupInfo
info
=
{
0
};
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
getTableHashVgroup
(
pCxt
,
pClause
->
dbName
,
pClause
->
tableName
,
&
info
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
addDropTbReqIntoVgroup
(
pVgroupHashmap
,
pClause
,
&
info
);
}
over:
taosMemoryFreeClear
(
pTableMeta
);
return
code
;
}
static
void
destroyDropTbReqBatch
(
SVgroupDropTableBatch
*
pTbBatch
)
{
taosArrayDestroy
(
pTbBatch
->
req
.
pArray
);
}
static
int32_t
serializeVgroupDropTableBatch
(
SVgroupDropTableBatch
*
pTbBatch
,
SArray
*
pBufArray
)
{
int
tlen
;
SCoder
coder
=
{
0
};
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVDropTbBatchReq
,
&
pTbBatch
->
req
,
tlen
,
ret
);
tlen
+=
sizeof
(
SMsgHead
);
void
*
buf
=
taosMemoryMalloc
(
tlen
);
if
(
NULL
==
buf
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pTbBatch
->
info
.
vgId
);
((
SMsgHead
*
)
buf
)
->
contLen
=
htonl
(
tlen
);
void
*
pBuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tCoderInit
(
&
coder
,
TD_LITTLE_ENDIAN
,
pBuf
,
tlen
-
sizeof
(
SMsgHead
),
TD_ENCODER
);
tEncodeSVDropTbBatchReq
(
&
coder
,
&
pTbBatch
->
req
);
tCoderClear
(
&
coder
);
SVgDataBlocks
*
pVgData
=
taosMemoryCalloc
(
1
,
sizeof
(
SVgDataBlocks
));
if
(
NULL
==
pVgData
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pVgData
->
vg
=
pTbBatch
->
info
;
pVgData
->
pData
=
buf
;
pVgData
->
size
=
tlen
;
pVgData
->
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pTbBatch
->
req
.
pArray
);
taosArrayPush
(
pBufArray
,
&
pVgData
);
return
TSDB_CODE_SUCCESS
;
}
static
SArray
*
serializeVgroupsDropTableBatch
(
int32_t
acctId
,
SHashObj
*
pVgroupHashmap
)
{
SArray
*
pBufArray
=
taosArrayInit
(
taosHashGetSize
(
pVgroupHashmap
),
sizeof
(
void
*
));
if
(
NULL
==
pBufArray
)
{
return
NULL
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SVgroupDropTableBatch
*
pTbBatch
=
NULL
;
do
{
pTbBatch
=
taosHashIterate
(
pVgroupHashmap
,
pTbBatch
);
if
(
pTbBatch
==
NULL
)
{
break
;
}
serializeVgroupDropTableBatch
(
pTbBatch
,
pBufArray
);
destroyDropTbReqBatch
(
pTbBatch
);
}
while
(
true
);
return
pBufArray
;
}
static
int32_t
rewriteDropTable
(
STranslateContext
*
pCxt
,
SQuery
*
pQuery
)
{
SDropTableStmt
*
pStmt
=
(
SDropTableStmt
*
)
pQuery
->
pRoot
;
SHashObj
*
pVgroupHashmap
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
pVgroupHashmap
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
bool
isSuperTable
=
false
;
SNode
*
pNode
;
FOREACH
(
pNode
,
pStmt
->
pTables
)
{
int32_t
code
=
buildDropTableVgroupHashmap
(
pCxt
,
(
SDropTableClause
*
)
pNode
,
&
isSuperTable
,
pVgroupHashmap
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
taosHashCleanup
(
pVgroupHashmap
);
return
code
;
}
if
(
isSuperTable
&&
LIST_LENGTH
(
pStmt
->
pTables
)
>
1
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_DROP_STABLE
);
}
}
if
(
isSuperTable
)
{
taosHashCleanup
(
pVgroupHashmap
);
return
TSDB_CODE_SUCCESS
;
}
SArray
*
pBufArray
=
serializeVgroupsDropTableBatch
(
pCxt
->
pParseCxt
->
acctId
,
pVgroupHashmap
);
taosHashCleanup
(
pVgroupHashmap
);
if
(
NULL
==
pBufArray
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -3565,6 +3701,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case
QUERY_NODE_CREATE_MULTI_TABLE_STMT
:
code
=
rewriteCreateMultiTable
(
pCxt
,
pQuery
);
break
;
case
QUERY_NODE_DROP_TABLE_STMT
:
code
=
rewriteDropTable
(
pCxt
,
pQuery
);
break
;
case
QUERY_NODE_ALTER_TABLE_STMT
:
if
(
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
==
((
SAlterTableStmt
*
)
pQuery
->
pRoot
)
->
alterType
)
{
code
=
rewriteAlterTable
(
pCxt
,
pQuery
);
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
a1e1c089
...
...
@@ -126,6 +126,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return
"slimit/soffset only available for PARTITION BY query"
;
case
TSDB_CODE_PAR_INVALID_TOPIC_QUERY
:
return
"Invalid topic query"
;
case
TSDB_CODE_PAR_INVALID_DROP_STABLE
:
return
"Cannot drop super table in batch"
;
case
TSDB_CODE_OUT_OF_MEMORY
:
return
"Out of memory"
;
default:
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
a1e1c089
...
...
@@ -944,9 +944,16 @@ static int32_t createSetOperatorLogicNode(SLogicPlanContext* pCxt, SSetOperator*
}
static
int32_t
getMsgType
(
ENodeType
sqlType
)
{
return
(
QUERY_NODE_CREATE_TABLE_STMT
==
sqlType
||
QUERY_NODE_CREATE_MULTI_TABLE_STMT
==
sqlType
)
?
TDMT_VND_CREATE_TABLE
:
TDMT_VND_SUBMIT
;
switch
(
sqlType
)
{
case
QUERY_NODE_CREATE_TABLE_STMT
:
case
QUERY_NODE_CREATE_MULTI_TABLE_STMT
:
return
TDMT_VND_CREATE_TABLE
;
case
QUERY_NODE_DROP_TABLE_STMT
:
return
TDMT_VND_DROP_TABLE
;
default:
break
;
}
return
TDMT_VND_SUBMIT
;
}
static
int32_t
createVnodeModifLogicNode
(
SLogicPlanContext
*
pCxt
,
SVnodeModifOpStmt
*
pStmt
,
SLogicNode
**
pLogicNode
)
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
a1e1c089
...
...
@@ -245,6 +245,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
return
TSDB_CODE_SUCCESS
;
case
TDMT_VND_CREATE_TABLE_RSP
:
case
TDMT_VND_DROP_TABLE_RSP
:
case
TDMT_VND_SUBMIT_RSP
:
break
;
default:
...
...
@@ -369,7 +370,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
SSubplan
*
child
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pChildren
,
n
);
SSubplan
*
child
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pChildren
,
n
);
SSchTask
**
childTask
=
taosHashGet
(
planToTask
,
&
child
,
POINTER_BYTES
);
if
(
NULL
==
childTask
||
NULL
==
*
childTask
)
{
SCH_TASK_ELOG
(
"subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
...
...
@@ -401,7 +402,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
SSubplan
*
parent
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pParents
,
n
);
SSubplan
*
parent
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pParents
,
n
);
SSchTask
**
parentTask
=
taosHashGet
(
planToTask
,
&
parent
,
POINTER_BYTES
);
if
(
NULL
==
parentTask
||
NULL
==
*
parentTask
)
{
SCH_TASK_ELOG
(
"subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
...
...
@@ -491,7 +492,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SSchLevel
level
=
{
0
};
SNodeListNode
*
plans
=
NULL
;
int32_t
taskNum
=
0
;
SSchLevel
*
pLevel
=
NULL
;
SSchLevel
*
pLevel
=
NULL
;
level
.
status
=
JOB_TASK_STATUS_NOT_START
;
...
...
@@ -1094,6 +1095,30 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
break
;
}
case
TDMT_VND_DROP_TABLE_RSP
:
{
SVDropTbBatchRsp
batchRsp
=
{
0
};
if
(
msg
)
{
SCoder
coder
=
{
0
};
tCoderInit
(
&
coder
,
TD_LITTLE_ENDIAN
,
msg
,
msgSize
,
TD_DECODER
);
code
=
tDecodeSVDropTbBatchRsp
(
&
coder
,
&
batchRsp
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
batchRsp
.
pArray
)
{
int32_t
num
=
taosArrayGetSize
(
batchRsp
.
pArray
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SVDropTbRsp
*
rsp
=
taosArrayGet
(
batchRsp
.
pArray
,
i
);
if
(
NEED_CLIENT_HANDLE_ERROR
(
rsp
->
code
))
{
tCoderClear
(
&
coder
);
SCH_ERR_JRET
(
rsp
->
code
);
}
}
}
tCoderClear
(
&
coder
);
SCH_ERR_JRET
(
code
);
}
SCH_ERR_JRET
(
rspCode
);
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
break
;
}
case
TDMT_VND_SUBMIT_RSP
:
{
if
(
msg
)
{
SSubmitRsp
*
rsp
=
(
SSubmitRsp
*
)
msg
;
...
...
@@ -1267,7 +1292,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
SSchTask
*
pTask
=
NULL
;
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
schAcquireJob
(
pParam
->
refId
);
if
(
NULL
==
pJob
)
{
...
...
@@ -1316,6 +1341,10 @@ int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_CREATE_TABLE_RSP
,
code
);
}
int32_t
schHandleDropTableCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_DROP_TABLE_RSP
,
code
);
}
int32_t
schHandleQueryCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_QUERY_RSP
,
code
);
}
...
...
@@ -1412,6 +1441,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case
TDMT_VND_CREATE_TABLE
:
*
fp
=
schHandleCreateTableCallback
;
break
;
case
TDMT_VND_DROP_TABLE
:
*
fp
=
schHandleDropTableCallback
;
break
;
case
TDMT_VND_SUBMIT
:
*
fp
=
schHandleSubmitCallback
;
break
;
...
...
@@ -1617,8 +1649,8 @@ _return:
int32_t
schMakeHbRpcCtx
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcCtx
*
pCtx
)
{
int32_t
code
=
0
;
SSchHbCallbackParam
*
param
=
NULL
;
SMsgSendInfo
*
pMsgSendInfo
=
NULL
;
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
SMsgSendInfo
*
pMsgSendInfo
=
NULL
;
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
SQueryNodeEpId
epId
=
{
0
};
epId
.
nodeId
=
addr
->
nodeId
;
...
...
@@ -1759,10 +1791,10 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
}
SRpcCtxVal
dst
=
{
0
};
void
*
pIter
=
taosHashIterate
(
pSrc
->
args
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pSrc
->
args
,
NULL
);
while
(
pIter
)
{
SRpcCtxVal
*
pVal
=
(
SRpcCtxVal
*
)
pIter
;
int32_t
*
msgType
=
taosHashGetKey
(
pIter
,
NULL
);
int32_t
*
msgType
=
taosHashGetKey
(
pIter
,
NULL
);
dst
=
*
pVal
;
dst
.
val
=
NULL
;
...
...
@@ -1916,7 +1948,7 @@ _return:
int32_t
schBuildAndSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
msgType
)
{
uint32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
void
*
msg
=
NULL
;
int32_t
code
=
0
;
bool
isCandidateAddr
=
false
;
bool
persistHandle
=
false
;
...
...
@@ -1931,6 +1963,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE
:
case
TDMT_VND_DROP_TABLE
:
case
TDMT_VND_SUBMIT
:
{
msgSize
=
pTask
->
msgLen
;
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
...
...
@@ -2673,7 +2706,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
for
(
int32_t
m
=
0
;
m
<
pLevel
->
taskNum
;
++
m
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
m
);
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
m
);
SQuerySubDesc
subDesc
=
{.
tid
=
pTask
->
taskId
,
.
status
=
pTask
->
status
};
taosArrayPush
(
pSub
,
&
subDesc
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录