Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9450c959
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9450c959
编写于
6月 07, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
get table index
上级
b875f37f
变更
13
显示空白变更内容
内联
并排
Showing
13 changed file
with
370 addition
and
32 deletion
+370
-32
include/common/tmsg.h
include/common/tmsg.h
+26
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+2
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+96
-0
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+104
-0
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+11
-0
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+53
-0
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+52
-17
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+1
-0
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+21
-14
未找到文件。
include/common/tmsg.h
浏览文件 @
9450c959
...
...
@@ -2459,6 +2459,32 @@ typedef struct {
int32_t
tSerializeSUserIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SUserIndexRsp
*
pRsp
);
int32_t
tDeserializeSUserIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
SUserIndexRsp
*
pRsp
);
typedef
struct
{
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
}
STableIndexReq
;
int32_t
tSerializeSTableIndexReq
(
void
*
buf
,
int32_t
bufLen
,
STableIndexReq
*
pReq
);
int32_t
tDeserializeSTableIndexReq
(
void
*
buf
,
int32_t
bufLen
,
STableIndexReq
*
pReq
);
typedef
struct
{
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int64_t
dstTbUid
;
int32_t
dstVgId
;
// for stream
char
*
expr
;
}
STableIndexInfo
;
typedef
struct
{
SArray
*
pIndex
;
}
STableIndexRsp
;
int32_t
tSerializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
STableIndexRsp
*
pRsp
);
int32_t
tDeserializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
STableIndexRsp
*
pRsp
);
typedef
struct
{
int8_t
mqMsgType
;
int32_t
code
;
...
...
include/common/tmsgdef.h
浏览文件 @
9450c959
...
...
@@ -130,6 +130,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_INDEX
,
"create-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_INDEX
,
"drop-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_INDEX
,
"get-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_TABLE_INDEX
,
"get-table-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_TOPIC
,
"create-topic"
,
SMCreateTopicReq
,
SMCreateTopicRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_TOPIC
,
"alter-topic"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_TOPIC
,
"drop-topic"
,
NULL
,
NULL
)
...
...
include/libs/catalog/catalog.h
浏览文件 @
9450c959
...
...
@@ -272,6 +272,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t
catalogGetIndexMeta
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
indexName
,
SIndexInfo
*
pInfo
);
int32_t
catalogGetTableIndex
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
tbFName
,
SArray
**
pRes
);
int32_t
catalogGetUdfInfo
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
funcName
,
SFuncInfo
*
pInfo
);
int32_t
catalogChkAuth
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
);
...
...
@@ -280,7 +282,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t
catalogUpdateVgEpSet
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int32_t
vgId
,
SEpSet
*
epSet
);
int32_t
ctgdLaunchAsyncCall
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
uint64_t
reqId
,
bool
forceUpdate
);
...
...
source/common/src/tmsg.c
浏览文件 @
9450c959
...
...
@@ -2392,6 +2392,102 @@ int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp
return
0
;
}
int32_t
tSerializeSTableIndexReq
(
void
*
buf
,
int32_t
bufLen
,
STableIndexReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
tbFName
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSTableIndexReq
(
void
*
buf
,
int32_t
bufLen
,
STableIndexReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
tbFName
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSTableIndexInfo
(
SEncoder
*
pEncoder
,
STableIndexInfo
*
pInfo
)
{
if
(
tEncodeI8
(
pEncoder
,
pInfo
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pInfo
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
interval
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
offset
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
sliding
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pInfo
->
dstTbUid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pInfo
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pInfo
->
expr
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tSerializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
STableIndexRsp
*
pRsp
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
int32_t
num
=
taosArrayGetSize
(
pRsp
->
pIndex
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
if
(
num
>
0
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STableIndexInfo
*
pInfo
=
(
STableIndexInfo
*
)
taosArrayGet
(
pRsp
->
pIndex
,
i
);
if
(
tSerializeSTableIndexInfo
(
&
encoder
,
pInfo
)
<
0
)
return
-
1
;
}
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSTableIndexInfo
(
SDecoder
*
pDecoder
,
STableIndexInfo
*
pInfo
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pInfo
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pInfo
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
interval
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
offset
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
sliding
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pInfo
->
dstTbUid
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pInfo
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pInfo
->
expr
)
<
0
)
return
-
1
;
return
0
;
}
int32_t
tDeserializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
STableIndexRsp
*
pRsp
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
int32_t
num
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
if
(
num
>
0
)
{
pRsp
->
pIndex
=
taosArrayInit
(
num
,
sizeof
(
STableIndexInfo
));
if
(
NULL
==
pRsp
->
pIndex
)
return
-
1
;
STableIndexInfo
info
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
if
(
tDeserializeSTableIndexInfo
(
&
decoder
,
&
info
)
<
0
)
return
-
1
;
if
(
NULL
==
taosArrayPush
(
pRsp
->
pIndex
,
&
info
))
{
taosMemoryFree
(
info
.
expr
);
return
-
1
;
}
}
}
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSShowReq
(
void
*
buf
,
int32_t
bufLen
,
SShowReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
9450c959
...
...
@@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_SMA
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_STREAM
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_INDEX
,
mmPutNodeMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_TABLE_INDEX
,
mmPutNodeMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_TOPIC
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_TOPIC
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_TOPIC
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
9450c959
...
...
@@ -309,6 +309,7 @@ typedef struct {
int8_t
slidingUnit
;
int8_t
timezone
;
int32_t
dstVgId
;
// for stream
int64_t
dstTbUid
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
9450c959
...
...
@@ -40,6 +40,7 @@ static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpS
static
int32_t
mndProcessMCreateSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessMDropSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessGetSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessGetTbSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveSma
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
);
...
...
@@ -59,6 +60,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_SMA_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_SMA_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_INDEX
,
mndProcessGetSmaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_TABLE_INDEX
,
mndProcessGetTbSmaReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndRetrieveSma
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndCancelGetNextSma
);
...
...
@@ -870,6 +872,55 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return
code
;
}
static
int32_t
mndGetTableSma
(
SMnode
*
pMnode
,
STableIndexReq
*
indexReq
,
STableIndexRsp
*
rsp
,
bool
*
exist
)
{
int32_t
code
=
0
;
SSmaObj
*
pSma
=
NULL
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
STableIndexInfo
info
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_SMA
,
pIter
,
(
void
**
)
&
pSma
);
if
(
pIter
==
NULL
)
break
;
if
(
pSma
->
stb
[
0
]
!=
indexReq
->
tbFName
[
0
]
||
strcmp
(
pSma
->
stb
,
indexReq
->
tbFName
))
{
continue
;
}
info
.
intervalUnit
=
pSma
->
intervalUnit
;
info
.
slidingUnit
=
pSma
->
slidingUnit
;
info
.
interval
=
pSma
->
interval
;
info
.
offset
=
pSma
->
offset
;
info
.
sliding
=
pSma
->
sliding
;
info
.
dstTbUid
=
pSma
->
dstTbUid
;
info
.
dstVgId
=
pSma
->
dstVgId
;
info
.
expr
=
taosMemoryMalloc
(
pSma
->
exprLen
+
1
);
if
(
info
.
expr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
sdbRelease
(
pSdb
,
pSma
);
return
code
;
}
memcpy
(
info
.
expr
,
pSma
->
expr
,
pSma
->
exprLen
);
info
.
expr
[
pSma
->
exprLen
]
=
0
;
if
(
NULL
==
taosArrayPush
(
rsp
->
pIndex
,
&
info
))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
taosMemoryFree
(
info
.
expr
);
sdbRelease
(
pSdb
,
pSma
);
return
code
;
}
*
exist
=
true
;
sdbRelease
(
pSdb
,
pSma
);
}
return
code
;
}
static
int32_t
mndProcessGetSmaReq
(
SRpcMsg
*
pReq
)
{
SUserIndexReq
indexReq
=
{
0
};
SMnode
*
pMnode
=
pReq
->
info
.
node
;
...
...
@@ -916,6 +967,59 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessGetTbSmaReq
(
SRpcMsg
*
pReq
)
{
STableIndexReq
indexReq
=
{
0
};
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
code
=
-
1
;
STableIndexRsp
rsp
=
{
0
};
bool
exist
=
false
;
if
(
tDeserializeSTableIndexReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
indexReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
rsp
.
pIndex
=
taosArrayInit
(
10
,
sizeof
(
STableIndexInfo
));
if
(
NULL
==
rsp
.
pIndex
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
_OVER
;
}
code
=
mndGetTableSma
(
pMnode
,
&
indexReq
,
&
rsp
,
&
exist
);
if
(
code
)
{
goto
_OVER
;
}
if
(
!
exist
)
{
code
=
-
1
;
terrno
=
TSDB_CODE_MND_DB_INDEX_NOT_EXIST
;
}
else
{
int32_t
contLen
=
tSerializeSTableIndexRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
_OVER
;
}
tSerializeSTableIndexRsp
(
pRsp
,
contLen
,
&
rsp
);
pReq
->
info
.
rsp
=
pRsp
;
pReq
->
info
.
rspLen
=
contLen
;
code
=
0
;
}
_OVER:
if
(
code
!=
0
)
{
mError
(
"failed to get table index %s since %s"
,
indexReq
.
tbFName
,
terrstr
());
}
return
code
;
}
static
int32_t
mndRetrieveSma
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
9450c959
...
...
@@ -490,6 +490,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
int32_t
ctgGetQnodeListFromMnode
(
CTG_PARAMS
,
SArray
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetDBCfgFromMnode
(
CTG_PARAMS
,
const
char
*
dbFName
,
SDbCfgInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetIndexInfoFromMnode
(
CTG_PARAMS
,
const
char
*
indexName
,
SIndexInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetTbIndexFromMnode
(
CTG_PARAMS
,
const
char
*
tbFName
,
SArray
**
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetUdfInfoFromMnode
(
CTG_PARAMS
,
const
char
*
funcName
,
SFuncInfo
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetUserDbAuthFromMnode
(
CTG_PARAMS
,
const
char
*
user
,
SGetUserAuthRsp
*
out
,
SCtgTask
*
pTask
);
int32_t
ctgGetTbMetaFromMnodeImpl
(
CTG_PARAMS
,
char
*
dbFName
,
char
*
tbName
,
STableMetaOutput
*
out
,
SCtgTask
*
pTask
);
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
9450c959
...
...
@@ -1136,6 +1136,17 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE
(
ctgGetIndexInfoFromMnode
(
CTG_PARAMS_LIST
(),
indexName
,
pInfo
,
NULL
));
}
int32_t
catalogGetTableIndex
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
tbFName
,
SArray
**
pRes
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pTrans
||
NULL
==
pMgmtEps
||
NULL
==
tbFName
||
NULL
==
pRes
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_API_LEAVE
(
ctgGetTbIndexFromMnode
(
CTG_PARAMS_LIST
(),
tbFName
,
pRes
,
NULL
));
}
int32_t
catalogGetUdfInfo
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
funcName
,
SFuncInfo
*
pInfo
)
{
CTG_API_ENTER
();
...
...
source/libs/catalog/src/ctgRemote.c
浏览文件 @
9450c959
...
...
@@ -85,6 +85,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug
(
"Got index from mnode, indexName:%s"
,
target
);
break
;
}
case
TDMT_MND_GET_TABLE_INDEX
:
{
if
(
TSDB_CODE_SUCCESS
!=
rspCode
)
{
qError
(
"error rsp for get table index, error:%s, tbFName:%s"
,
tstrerror
(
rspCode
),
target
);
CTG_ERR_RET
(
rspCode
);
}
code
=
queryProcessMsgRsp
[
TMSG_INDEX
(
reqType
)](
out
,
msg
,
msgSize
);
if
(
code
)
{
qError
(
"Process get table index rsp failed, error:%s, tbFName:%s"
,
tstrerror
(
code
),
target
);
CTG_ERR_RET
(
code
);
}
qDebug
(
"Got table index from mnode, tbFName:%s"
,
target
);
break
;
}
case
TDMT_MND_RETRIEVE_FUNC
:
{
if
(
TSDB_CODE_SUCCESS
!=
rspCode
)
{
qError
(
"error rsp for get udf, error:%s, funcName:%s"
,
tstrerror
(
rspCode
),
target
);
...
...
@@ -412,6 +427,44 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTbIndexFromMnode
(
CTG_PARAMS
,
const
char
*
tbFName
,
SArray
**
out
,
SCtgTask
*
pTask
)
{
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
reqType
=
TDMT_MND_GET_TABLE_INDEX
;
void
*
(
*
mallocFp
)(
int32_t
)
=
pTask
?
taosMemoryMalloc
:
rpcMallocCont
;
ctgDebug
(
"try to get tb index from mnode, tbFName:%s"
,
tbFName
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
reqType
)]((
void
*
)
tbFName
,
&
msg
,
0
,
&
msgLen
,
mallocFp
);
if
(
code
)
{
ctgError
(
"Build get index msg failed, code:%s, tbFName:%s"
,
tstrerror
(
code
),
tbFName
);
CTG_ERR_RET
(
code
);
}
if
(
pTask
)
{
void
*
pOut
=
taosMemoryCalloc
(
1
,
POINTER_BYTES
);
if
(
NULL
==
pOut
)
{
CTG_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
CTG_ERR_RET
(
ctgUpdateMsgCtx
(
&
pTask
->
msgCtx
,
reqType
,
pOut
,
(
char
*
)
tbFName
));
CTG_RET
(
ctgAsyncSendMsg
(
CTG_PARAMS_LIST
(),
pTask
,
reqType
,
msg
,
msgLen
));
}
SRpcMsg
rpcMsg
=
{
.
msgType
=
reqType
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pTrans
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
CTG_ERR_RET
(
ctgProcessRspMsg
(
out
,
reqType
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
,
rpcRsp
.
code
,
(
char
*
)
tbFName
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetUdfInfoFromMnode
(
CTG_PARAMS
,
const
char
*
funcName
,
SFuncInfo
*
out
,
SCtgTask
*
pTask
)
{
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
9450c959
...
...
@@ -203,6 +203,24 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetTbIndexMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
,
void
*
(
*
mallcFp
)(
int32_t
))
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
STableIndexReq
indexReq
=
{
0
};
strcpy
(
indexReq
.
tbFName
,
input
);
int32_t
bufLen
=
tSerializeSTableIndexReq
(
NULL
,
0
,
&
indexReq
);
void
*
pBuf
=
(
*
mallcFp
)(
bufLen
);
tSerializeSTableIndexReq
(
pBuf
,
bufLen
,
&
indexReq
);
*
msg
=
pBuf
;
*
msgLen
=
bufLen
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
SUseDbOutput
*
pOut
=
output
;
...
...
@@ -459,6 +477,22 @@ int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessGetTbIndexRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
STableIndexRsp
out
=
{
0
};
if
(
tDeserializeSTableIndexRsp
(
msg
,
msgSize
,
&
out
)
!=
0
)
{
qError
(
"tDeserializeSTableIndexRsp failed, msgSize:%d"
,
msgSize
);
return
TSDB_CODE_INVALID_MSG
;
}
*
(
void
**
)
output
=
out
.
pIndex
;
return
TSDB_CODE_SUCCESS
;
}
void
initQueryModuleMsgHandle
()
{
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
...
...
@@ -469,7 +503,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_INDEX
)]
=
queryBuildGetIndexMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
queryBuildRetrieveFuncMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
queryBuildGetUserAuthMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_TABLE_INDEX
)]
=
queryBuildGetTbIndexMsg
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
...
...
@@ -479,6 +513,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_INDEX
)]
=
queryProcessGetIndexRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
queryProcessRetrieveFuncRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
queryProcessGetUserAuthRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_TABLE_INDEX
)]
=
queryProcessGetTbIndexRsp
;
}
#pragma GCC diagnostic pop
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
9450c959
...
...
@@ -207,6 +207,7 @@ typedef struct SSchJob {
SArray
*
dataSrcTasks
;
// SArray<SQueryTask*>
int32_t
levelIdx
;
SEpSet
dataSrcEps
;
SHashObj
*
taskList
;
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
9450c959
...
...
@@ -64,6 +64,13 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN
pJob
->
nodeList
=
taosArrayDup
(
pNodeList
);
}
pJob
->
taskList
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pJob
->
taskList
)
{
SCH_JOB_ELOG
(
"taosHashInit %d taskList failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_ERR_JRET
(
schValidateAndBuildJob
(
pDag
,
pJob
));
if
(
SCH_IS_EXPLAIN_JOB
(
pJob
))
{
...
...
@@ -486,23 +493,26 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_SET_JOB_TYPE
(
pJob
,
plan
->
subplanType
);
SSchTask
task
=
{
0
};
SSchTask
*
pTask
=
&
task
;
SCH_ERR_JRET
(
schInitTask
(
pJob
,
&
task
,
plan
,
pLevel
));
void
*
p
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
if
(
NULL
==
p
)
{
SSchTask
*
pTask
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
if
(
NULL
==
p
Task
)
{
SCH_TASK_ELOG
(
"taosArrayPush task to level failed, level:%d, taskIdx:%d"
,
pLevel
->
level
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_ERR_JRET
(
schRecordQueryDataSrc
(
pJob
,
p
));
SCH_ERR_JRET
(
schRecordQueryDataSrc
(
pJob
,
p
Task
));
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
,
POINTER_BYTES
))
{
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
Task
,
POINTER_BYTES
))
{
SCH_TASK_ELOG
(
"taosHashPut to planToTaks failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
0
!=
taosHashPut
(
pJob
->
taskList
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
),
&
pTask
,
POINTER_BYTES
))
{
SCH_TASK_ELOG
(
"taosHashPut to taskList failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
++
pJob
->
taskNum
;
}
...
...
@@ -1276,15 +1286,11 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
}
int32_t
schGetTaskInJob
(
SSchJob
*
pJob
,
uint64_t
taskId
,
SSchTask
**
pTask
)
{
schGetTaskFromList
(
pJob
->
execTasks
,
taskId
,
pTask
);
schGetTaskFromList
(
pJob
->
taskList
,
taskId
,
pTask
);
if
(
NULL
==
*
pTask
)
{
schGetTaskFromList
(
pJob
->
succTasks
,
taskId
,
pTask
);
if
(
NULL
==
*
pTask
)
{
SCH_JOB_ELOG
(
"task not found in execList & succList, taskId:%"
PRIx64
,
taskId
);
SCH_JOB_ELOG
(
"task not found in job task list, taskId:%"
PRIx64
,
taskId
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1445,6 +1451,7 @@ void schFreeJobImpl(void *job) {
taosHashCleanup
(
pJob
->
execTasks
);
taosHashCleanup
(
pJob
->
failTasks
);
taosHashCleanup
(
pJob
->
succTasks
);
taosHashCleanup
(
pJob
->
taskList
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录