Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a352b935
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a352b935
编写于
12月 14, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update catalog and message
上级
5c7332c2
变更
19
显示空白变更内容
内联
并排
Showing
19 changed file
with
440 addition
and
111 deletion
+440
-111
include/common/taosmsg.h
include/common/taosmsg.h
+15
-4
include/common/tmessage.h
include/common/tmessage.h
+32
-0
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+23
-14
include/util/taoserror.h
include/util/taoserror.h
+1
-1
include/util/tlog.h
include/util/tlog.h
+2
-0
source/client/CMakeLists.txt
source/client/CMakeLists.txt
+1
-0
source/client/src/client.c
source/client/src/client.c
+3
-0
source/common/inc/commonInt.h
source/common/inc/commonInt.h
+15
-1
source/common/src/taosmsg.c
source/common/src/taosmsg.c
+20
-0
source/common/src/tglobal.c
source/common/src/tglobal.c
+1
-0
source/common/src/tmessage.c
source/common/src/tmessage.c
+86
-61
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+17
-9
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+212
-16
source/libs/parser/src/astValidate.c
source/libs/parser/src/astValidate.c
+1
-1
source/libs/parser/src/parser.c
source/libs/parser/src/parser.c
+6
-1
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-1
source/util/src/tlog.c
source/util/src/tlog.c
+2
-0
未找到文件。
include/common/taosmsg.h
浏览文件 @
a352b935
...
...
@@ -77,7 +77,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_STABLE
,
"create-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_STABLE
,
"alter-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_STABLE
,
"drop-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_
STABLE_VGROUP
,
"stable-vgroup
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_
VGROUP_LIST
,
"vgroup-list
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_KILL_QUERY
,
"kill-query"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_KILL_STREAM
,
"kill-stream"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_KILL_CONN
,
"kill-conn"
)
...
...
@@ -216,7 +216,6 @@ extern char *taosMsg[];
typedef
struct
SBuildTableMetaInput
{
int32_t
vgId
;
STagData
*
tagData
;
char
*
tableFullName
;
}
SBuildTableMetaInput
;
...
...
@@ -776,8 +775,6 @@ typedef struct {
typedef
struct
{
SMsgHead
msgHead
;
char
tableFname
[
TSDB_TABLE_FNAME_LEN
];
int8_t
createFlag
;
char
tags
[];
}
STableInfoMsg
;
typedef
struct
{
...
...
@@ -792,6 +789,20 @@ typedef struct SSTableVgroupMsg {
int32_t
numOfTables
;
}
SSTableVgroupMsg
,
SSTableVgroupRspMsg
;
typedef
struct
SVgroupInfo
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddrMsg
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupInfo
;
typedef
struct
SVgroupListRspMsg
{
int32_t
vgroupNum
;
int32_t
vgroupVersion
;
SVgroupInfo
vgroupInfo
[];
}
SVgroupListRspMsg
;
typedef
SVgroupListRspMsg
SVgroupListInfo
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
...
...
include/common/tmessage.h
0 → 100644
浏览文件 @
a352b935
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_TMESSAGE_H_
#define _TD_COMMON_TMESSAGE_H_
#ifdef __cplusplus
extern
"C"
{
#endif
extern
int32_t
(
*
tscBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
tscProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_COMMON_TMESSAGE_H_*/
include/libs/catalog/catalog.h
浏览文件 @
a352b935
...
...
@@ -30,12 +30,6 @@ extern "C" {
struct
SCatalog
;
typedef
struct
SVgroupInfo
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddrMsg
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupInfo
;
typedef
struct
SDBVgroupInfo
{
int32_t
vgroupVersion
;
SArray
*
vgId
;
...
...
@@ -91,7 +85,11 @@ typedef struct STableMeta {
SSchema
schema
[];
}
STableMeta
;
int32_t
catalogInit
(
SCatalog
*
cfg
);
typedef
struct
SCatalogCfg
{
}
SCatalogCfg
;
int32_t
catalogInit
(
SCatalogCfg
*
cfg
);
/**
* Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side.
...
...
@@ -101,17 +99,31 @@ int32_t catalogInit(SCatalog *cfg);
*/
int32_t
catalogGetHandle
(
const
char
*
clusterId
,
struct
SCatalog
**
catalogHandle
);
int32_t
catalogGetVgroupVersion
(
struct
SCatalog
*
pCatalog
,
int32_t
*
version
);
int32_t
catalogGetVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SArray
**
pVgroupList
);
int32_t
catalogUpdateVgroup
(
struct
SCatalog
*
pCatalog
,
SVgroupListInfo
*
pVgroup
);
int32_t
catalogUpdateVgroupList
(
struct
SCatalog
*
pCatalog
,
int32_t
version
,
SArray
*
vgroupList
);
int32_t
catalogGetDBVgroupVersion
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
int32_t
*
version
);
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogGetDBVgroupInfo
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogUpdateDBVgroupInfo
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pTableName
,
STableMeta
*
pTableMeta
);
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
STableMeta
*
pTableMeta
);
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
STableMeta
*
pTableMeta
,
STableMeta
*
pNewTableMeta
);
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
SRpcObj
*
pRpcObj
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pTableName
,
const
STagData
*
tagData
,
STableMeta
*
pTableMeta
);
/**
* get table's vgroup list.
* @param clusterId
* @pVgroupList - array of SVgroupInfo
* @return
*/
int32_t
catalogGetTableVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pTableName
,
SArray
*
pVgroupList
);
/**
...
...
@@ -125,9 +137,6 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const S
*/
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SCatalogRsp
*
pRsp
);
int32_t
catalogRenewTableMeta
(
struct
SCatalog
*
pCatalog
,
const
SEpSet
*
pMgmtEps
,
const
STableMeta
*
pTableMeta
);
int32_t
catalogRenewAndGetTableMeta
(
struct
SCatalog
*
pCatalog
,
const
SEpSet
*
pMgmtEps
,
const
STableMeta
*
pTableMeta
,
STableMeta
*
pNewTableMeta
);
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
const
SEpSet
*
pMgmtEps
,
SEpSet
*
pQnodeEpSet
);
...
...
include/util/taoserror.h
浏览文件 @
a352b935
...
...
@@ -492,7 +492,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection")
// catalog
#define TSDB_CODE_CTG_INTERNAL_ER
OR
TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error
#define TSDB_CODE_CTG_INTERNAL_ER
ROR
TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error
#define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) //invalid catalog input parameters
#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error
...
...
include/util/tlog.h
浏览文件 @
a352b935
...
...
@@ -45,6 +45,8 @@ extern int32_t sDebugFlag;
extern
int32_t
tsdbDebugFlag
;
extern
int32_t
cqDebugFlag
;
extern
int32_t
debugFlag
;
extern
int32_t
ctgDebugFlag
;
#define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL
...
...
source/client/CMakeLists.txt
浏览文件 @
a352b935
...
...
@@ -6,5 +6,6 @@ target_include_directories(
)
target_link_libraries
(
taos
PRIVATE common
INTERFACE api
)
source/client/src/client.c
浏览文件 @
a352b935
...
...
@@ -18,6 +18,9 @@
//TAOS_RES *taos_query(TAOS *taos, const char *sql) {
//
//}
#include "taosmsg.h"
int
taos_init
()
{
return
0
;
}
void
taos_cleanup
(
void
)
{}
source/common/inc/commonInt.h
浏览文件 @
a352b935
...
...
@@ -20,6 +20,20 @@
extern
"C"
{
#endif
#include "tlog.h"
extern
int32_t
cDebugFlag
;
extern
int8_t
tscEmbedded
;
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
...
...
source/common/src/taosmsg.c
0 → 100644
浏览文件 @
a352b935
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define TAOS_MESSAGE_C
#include "taosmsg.h"
source/common/src/tglobal.c
浏览文件 @
a352b935
无法预览此类型文件
source/common/src/tmessage.c
浏览文件 @
a352b935
...
...
@@ -13,21 +13,103 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define TAOS_MESSAGE_C
#include "taosmsg.h"
#include "commonint.h"
int32_t
(
*
tscBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
=
{
0
};
int32_t
(
*
tscProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
=
{
0
};
int32_t
tscBuildVgroupListReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
*
msgLen
=
0
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
tscBuildTableMetaReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
input
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SBuildTableMetaInput
*
bInput
=
(
SBuildTableMetaInput
*
)
input
;
int32_t
estimateSize
=
sizeof
(
STableInfoMsg
);
if
(
NULL
==
*
msg
||
msgSize
<
estimateSize
)
{
tfree
(
*
msg
);
*
msg
=
calloc
(
1
,
estimateSize
);
if
(
NULL
==
*
msg
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
STableInfoMsg
*
bMsg
=
(
STableInfoMsg
*
)
*
msg
;
bMsg
->
msgHead
.
vgId
=
bInput
->
vgId
;
strncpy
(
bMsg
->
tableFname
,
bInput
->
tableFullName
,
sizeof
(
bMsg
->
tableFname
));
bMsg
->
tableFname
[
sizeof
(
bMsg
->
tableFname
)
-
1
]
=
0
;
*
msgLen
=
(
int32_t
)
sizeof
(
*
bMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
tscProcessVgroupListRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SVgroupListRspMsg
*
pRsp
=
(
SVgroupListRspMsg
*
)
msg
;
pRsp
->
vgroupNum
=
htonl
(
pRsp
->
vgroupNum
);
pRsp
->
vgroupVersion
=
htonl
(
pRsp
->
vgroupVersion
);
if
(
pRsp
->
vgroupNum
<
0
)
{
tscError
(
"vgroup number[%d] in rsp is invalid"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
if
(
pRsp
->
vgroupVersion
<
0
)
{
tscError
(
"vgroup vgroupVersion[%d] in rsp is invalid"
,
pRsp
->
vgroupVersion
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
if
(
msgSize
!=
(
pRsp
->
vgroupNum
*
sizeof
(
pRsp
->
vgroupInfo
[
0
])
+
sizeof
(
*
pRsp
)))
{
tscError
(
"vgroup list msg size mis-match, msgSize:%d, vgroup number:%d"
,
msgSize
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
// keep SVgroupListInfo/SVgroupListRspMsg the same
*
(
SVgroupListInfo
**
)
output
=
(
SVgroupListInfo
*
)
msg
;
if
(
pRsp
->
vgroupNum
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgroupNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
}
return
TSDB_CODE_SUCCESS
;
}
void
msgInit
()
{
tscBuildMsg
[
TSDB_MSG_TYPE_TABLE_META
]
=
tscBuildTableMetaReqMsg
;
tscBuildMsg
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
tscBuildVgroupListReqMsg
;
tscProcessMsgRsp
[
TSDB_MSG_TYPE_
TABLE_META
]
=
;
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp
[
TSDB_MSG_TYPE_
VGROUP_LIST
]
=
tscProcessVgroupListRsp
;
/*
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
...
...
@@ -106,63 +188,6 @@ void msgInit() {
}
char
*
msgSerializeTagData
(
STagData
*
pTagData
,
char
*
pMsg
)
{
int32_t
n
=
(
int32_t
)
strlen
(
pTagData
->
name
);
*
(
int32_t
*
)
pMsg
=
htonl
(
n
);
pMsg
+=
sizeof
(
n
);
memcpy
(
pMsg
,
pTagData
->
name
,
n
);
pMsg
+=
n
;
*
(
int32_t
*
)
pMsg
=
htonl
(
pTagData
->
dataLen
);
pMsg
+=
sizeof
(
int32_t
);
memcpy
(
pMsg
,
pTagData
->
data
,
pTagData
->
dataLen
);
pMsg
+=
pTagData
->
dataLen
;
return
pMsg
;
}
int32_t
tscBuildTableMetaReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
input
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SBuildTableMetaInput
*
bInput
=
(
SBuildTableMetaInput
*
)
input
;
int32_t
estimateSize
=
sizeof
(
STableInfoMsg
)
+
(
bInput
->
tagData
?
(
sizeof
(
*
bInput
->
tagData
)
+
bInput
->
tagData
->
dataLen
)
:
0
);
if
(
NULL
==
*
msg
||
msgSize
<
estimateSize
)
{
tfree
(
*
msg
);
*
msg
=
calloc
(
1
,
estimateSize
);
if
(
NULL
==
*
msg
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
STableInfoMsg
*
bMsg
=
(
STableInfoMsg
*
)
*
msg
;
bMsg
->
msgHead
.
vgId
=
bInput
->
vgId
;
strncpy
(
bMsg
->
tableFname
,
bInput
->
tableFullName
,
sizeof
(
bMsg
->
tableFname
));
bMsg
->
tableFname
[
sizeof
(
bMsg
->
tableFname
)
-
1
]
=
0
;
int32_t
autoCreate
=
(
bInput
->
tagData
&&
bInput
->
tagData
->
dataLen
>
0
);
bMsg
->
createFlag
=
htons
(
autoCreate
?
1
:
0
);
char
*
pMsg
=
NULL
;
// tag data exists
if
(
autoCreate
)
{
pMsg
=
msgSerializeTagData
(
bInput
->
tagData
,
(
char
*
)
bMsg
->
tags
);
}
*
msgLen
=
(
int32_t
)(
pMsg
-
(
char
*
)
bMsg
);
return
TSDB_CODE_SUCCESS
;
}
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
a352b935
...
...
@@ -72,7 +72,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_STABLE
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ALTER_STABLE
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_DROP_STABLE
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_
STABLE_VGROUP
]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_
VGROUP_LIST
]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_KILL_QUERY
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_KILL_STREAM
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_KILL_CONN
]
=
dndProcessMnodeWriteMsg
;
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
a352b935
...
...
@@ -21,13 +21,18 @@ extern "C" {
#endif
#include "catalog.h"
#include "common.h"
#include "tlog.h"
#define CTG_DEFAULT_CLUSTER_NUMBER 3
#define CTG_DEFAULT_CLUSTER_NUMBER 6
#define CTG_DEFAULT_VGROUP_NUMBER 100
#define CTG_DEFAULT_INVALID_VERSION (-1)
typedef
struct
SVgroupListCache
{
int32_t
vgroupNum
;
int32_t
vgroupVersion
;
SHashObj
*
cache
;
//key:vgId, value:SVgroupInfo
SHashObj
*
cache
;
// key:vgId, value:SVgroupInfo*
SArray
*
arrayCache
;
// SVgroupInfo
}
SVgroupListCache
;
typedef
struct
SDBVgroupCache
{
...
...
@@ -50,13 +55,16 @@ typedef struct SCatalogMgmt {
}
SCatalogMgmt
;
extern
int32_t
ctgDebugFlag
;
#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgInfo(...) do { if (ctgDebugFlag & DEBUG_INFO) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebug(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgFatal(...) tscFatal(__VA_ARGS__)
#define ctgError(...) tscError(__VA_ARGS__)
#define ctgWarn(...) tscWarn(__VA_ARGS__)
#define ctgInfo(...) tscInfo(__VA_ARGS__)
#define ctgDebug(...) tscDebug(__VA_ARGS__)
#define ctgTrace(...) tscTrace(__VA_ARGS__)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
a352b935
...
...
@@ -14,53 +14,248 @@
*/
#include "catalogInt.h"
#include "trpc.h"
#include "tmessage.h"
SCatalogMgmt
ctgMgmt
=
{
0
};
int32_t
ctgGetVgroupFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SVgroupListInfo
**
pVgroup
)
{
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
tscBuildMsg
[
TSDB_MSG_TYPE_VGROUP_LIST
](
NULL
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
return
code
;
}
SRpcMsg
rpcMsg
=
{
.
msgType
=
TSDB_MSG_TYPE_VGROUP_LIST
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
code
=
tscProcessMsgRsp
[
TSDB_MSG_TYPE_VGROUP_LIST
](
pVgroup
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
return
code
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetVgroupFromCache
(
SCatalog
*
pCatalog
,
SArray
**
pVgroupList
,
int32_t
*
exist
)
{
if
(
NULL
==
pCatalog
->
vgroupCache
.
arrayCache
||
pCatalog
->
vgroupCache
.
vgroupVersion
<
0
)
{
*
exist
=
0
;
return
TSDB_CODE_SUCCESS
;
}
if
(
pVgroupList
)
{
*
pVgroupList
=
taosArrayDup
(
pCatalog
->
vgroupCache
.
arrayCache
);
}
*
exist
=
1
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogInit
(
SCatalog
*
cfg
)
{
int32_t
catalogInit
(
SCatalogCfg
*
cfg
)
{
ctgMgmt
.
pCluster
=
taosHashInit
(
CTG_DEFAULT_CLUSTER_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
ctgMgmt
.
pCluster
)
{
CTG_ERR_LRET
(
TSDB_CODE_CTG_INTERNAL_EROR
,
"init %d cluster cache failed"
,
CTG_DEFAULT_CLUSTER_NUMBER
);
CTG_ERR_LRET
(
TSDB_CODE_CTG_INTERNAL_ER
R
OR
,
"init %d cluster cache failed"
,
CTG_DEFAULT_CLUSTER_NUMBER
);
}
return
TSDB_CODE_SUCCESS
;
}
struct
SCatalog
*
catalogGetHandle
(
const
char
*
clusterId
)
{
if
(
NULL
==
clusterId
)
{
return
NULL
;
int32_t
catalogGetHandle
(
const
char
*
clusterId
,
struct
SCatalog
**
catalogHandle
)
{
if
(
NULL
==
clusterId
||
NULL
==
catalogHandle
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
if
(
NULL
==
ctgMgmt
.
pCluster
)
{
ctgError
(
"cluster cache are not ready"
);
return
NULL
;
return
TSDB_CODE_CTG_NOT_READY
;
}
size_t
clen
=
strlen
(
clusterId
);
SCatalog
*
clusterCtg
=
(
SCatalog
*
)
taosHashGet
(
ctgMgmt
.
pCluster
,
clusterId
,
clen
);
if
(
clusterCtg
)
{
return
clusterCtg
;
*
catalogHandle
=
clusterCtg
;
return
TSDB_CODE_SUCCESS
;
}
clusterCtg
=
calloc
(
1
,
sizeof
(
*
clusterCtg
));
if
(
NULL
==
clusterCtg
)
{
ctgError
(
"calloc %d failed"
,
sizeof
(
*
clusterCtg
));
return
NULL
;
ctgError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
*
clusterCtg
));
return
TSDB_CODE_CTG_MEM_ERROR
;
}
clusterCtg
->
vgroupCache
.
vgroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
if
(
taosHashPut
(
ctgMgmt
.
pCluster
,
clusterId
,
clen
,
&
clusterCtg
,
POINTER_BYTES
))
{
ctgError
(
"put cluster %s cache to hash failed"
,
clusterId
);
tfree
(
clusterCtg
);
return
NULL
;
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
*
catalogHandle
=
clusterCtg
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogGetVgroupVersion
(
struct
SCatalog
*
pCatalog
,
int32_t
*
version
)
{
if
(
NULL
==
pCatalog
||
NULL
==
version
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
return
clusterCtg
;
*
version
=
pCatalog
->
vgroupCache
.
vgroupVersion
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogGetTableMeta
(
struct
SCatalog
*
pCatalog
,
SRpcObj
*
pRpcObj
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pTableName
,
const
STagData
*
tagData
,
STableMeta
*
pTableMeta
)
{
int32_t
catalogUpdateVgroup
(
struct
SCatalog
*
pCatalog
,
SVgroupListInfo
*
pVgroup
)
{
if
(
NULL
==
pVgroup
)
{
ctgError
(
"vgroup get from mnode succeed, but no output"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
if
(
pVgroup
->
vgroupVersion
<
0
)
{
ctgError
(
"vgroup version[%d] is invalid"
,
pVgroup
->
vgroupVersion
);
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
if
(
NULL
==
pCatalog
->
vgroupCache
.
arrayCache
)
{
pCatalog
->
vgroupCache
.
arrayCache
=
taosArrayInit
(
pVgroup
->
vgroupNum
,
sizeof
(
pVgroup
->
vgroupInfo
[
0
]));
if
(
NULL
==
pCatalog
->
vgroupCache
.
arrayCache
)
{
ctgError
(
"init array[%d] for cluster cache failed"
,
pVgroup
->
vgroupNum
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
}
else
{
taosArrayClear
(
pCatalog
->
vgroupCache
.
arrayCache
);
}
if
(
NULL
==
pCatalog
->
vgroupCache
.
cache
)
{
pCatalog
->
vgroupCache
.
cache
=
taosHashInit
(
CTG_DEFAULT_VGROUP_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
vgroupCache
.
cache
)
{
ctgError
(
"init hash[%d] for cluster cache failed"
,
CTG_DEFAULT_VGROUP_NUMBER
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
}
else
{
taosHashClear
(
pCatalog
->
vgroupCache
.
cache
);
}
SVgroupInfo
*
vInfo
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
vgroupNum
;
++
i
)
{
vInfo
=
taosArrayPush
(
pCatalog
->
vgroupCache
.
arrayCache
,
&
pVgroup
->
vgroupInfo
[
i
]);
if
(
NULL
==
vInfo
)
{
ctgError
(
"push to vgroup array cache failed"
);
goto
error_exit
;
}
if
(
taosHashPut
(
pCatalog
->
vgroupCache
.
cache
,
&
pVgroup
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pVgroup
->
vgroupInfo
[
i
].
vgId
),
&
vInfo
,
POINTER_BYTES
)
!=
0
)
{
ctgError
(
"push to vgroup hash cache failed"
);
goto
error_exit
;
}
}
pCatalog
->
vgroupCache
.
vgroupVersion
=
pVgroup
->
vgroupVersion
;
return
TSDB_CODE_SUCCESS
;
error_exit:
if
(
pCatalog
->
vgroupCache
.
arrayCache
)
{
taosArrayDestroy
(
pCatalog
->
vgroupCache
.
arrayCache
);
pCatalog
->
vgroupCache
.
arrayCache
=
NULL
;
}
if
(
pCatalog
->
vgroupCache
.
cache
)
{
taosHashCleanup
(
pCatalog
->
vgroupCache
.
cache
);
pCatalog
->
vgroupCache
.
cache
=
NULL
;
}
pCatalog
->
vgroupCache
.
vgroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
int32_t
catalogGetVgroup
(
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SArray
**
pVgroupList
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pMgmtEps
||
NULL
==
pRpc
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
int32_t
exist
=
0
;
CTG_ERR_RET
(
ctgGetVgroupFromCache
(
pCatalog
,
pVgroupList
,
&
exist
));
if
(
exist
)
{
return
TSDB_CODE_SUCCESS
;
}
SVgroupListInfo
*
pVgroup
=
NULL
;
CTG_ERR_RET
(
ctgGetVgroupFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
pVgroup
));
CTG_ERR_RET
(
catalogUpdateVgroup
(
pCatalog
,
pVgroup
));
if
(
pVgroupList
)
{
CTG_ERR_RET
(
ctgGetVgroupFromCache
(
pCatalog
,
pVgroupList
,
&
exist
));
}
if
(
0
==
exist
)
{
ctgError
(
"catalog fetched but get from cache failed"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogGetDBVgroupVersion
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
int32_t
*
version
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
version
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
*
version
=
CTG_DEFAULT_INVALID_VERSION
;
return
TSDB_CODE_SUCCESS
;
}
SDBVgroupInfo
*
dbInfo
=
taosHashGet
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
if
(
NULL
==
dbInfo
)
{
*
version
=
CTG_DEFAULT_INVALID_VERSION
;
return
TSDB_CODE_SUCCESS
;
}
*
version
=
dbInfo
->
vgroupVersion
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
dbInfo
)
{
}
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
)
{
}
int32_t
catalogGetTableMetaFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pTableName
,
const
STagData
*
tagData
,
STableMeta
*
pTableMeta
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
pTableMeta
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
...
...
@@ -79,12 +274,13 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, SRpcObj *pRpcObj, const S
.
msgType
=
TSDB_MSG_TYPE_TABLE_META
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
.
ahandle
=
(
void
*
)
pSql
->
self
,
.
handle
=
NULL
,
.
code
=
0
};
rpcSendRequest
(
pRpcObj
->
pDnodeConn
,
pVnodeEpSet
,
&
rpcMsg
,
&
pSql
->
rpcRid
);
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/parser/src/astValidate.c
浏览文件 @
a352b935
...
...
@@ -4081,7 +4081,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
SCatalogRsp
data
=
{
0
};
// TODO: check if the qnode info has been cached already
req
.
qNode
Epset
=
true
;
req
.
qNode
Required
=
true
;
code
=
qParserExtractRequestedMetaInfo
(
pInfo
,
&
req
,
msgBuf
,
msgBufLen
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
source/libs/parser/src/parser.c
浏览文件 @
a352b935
...
...
@@ -42,7 +42,12 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo**
return
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
}
struct
SCatalog
*
pCatalog
=
catalogGetHandle
(
NULL
);
struct
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
catalogGetHandle
(
NULL
,
&
pCatalog
);
if
(
code
)
{
return
code
;
}
return
qParserValidateSqlNode
(
pCatalog
,
&
info
,
*
pQueryInfo
,
id
,
msg
,
msgLen
);
}
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
a352b935
...
...
@@ -406,7 +406,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t
// for TDengine, all the query, show commands shall have TCP connection
char
type
=
pMsg
->
msgType
;
if
(
type
==
TSDB_MSG_TYPE_QUERY
||
type
==
TSDB_MSG_TYPE_SHOW_RETRIEVE
||
type
==
TSDB_MSG_TYPE_FETCH
||
type
==
TSDB_MSG_TYPE_
STABLE_VGROUP
||
type
==
TSDB_MSG_TYPE_FETCH
||
type
==
TSDB_MSG_TYPE_
VGROUP_LIST
||
type
==
TSDB_MSG_TYPE_TABLES_META
||
type
==
TSDB_MSG_TYPE_TABLE_META
||
type
==
TSDB_MSG_TYPE_SHOW
||
type
==
TSDB_MSG_TYPE_STATUS
||
type
==
TSDB_MSG_TYPE_ALTER_TABLE
)
pContext
->
connType
=
RPC_CONN_TCPC
;
...
...
source/util/src/terror.c
浏览文件 @
a352b935
...
...
@@ -498,7 +498,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level")
TAOS_DEFINE_ERROR
(
TSDB_CODE_FS_NO_VALID_DISK
,
"tfs no valid disk"
)
// catalog
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_INTERNAL_ER
OR
,
"catalog interval error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_INTERNAL_ER
ROR
,
"catalog interval error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_INVALID_INPUT
,
"invalid catalog input parameters"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_NOT_READY
,
"catalog is not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_MEM_ERROR
,
"catalog memory error"
)
...
...
source/util/src/tlog.c
浏览文件 @
a352b935
...
...
@@ -99,6 +99,8 @@ int32_t wDebugFlag = 135;
int32_t
tsdbDebugFlag
=
131
;
int32_t
cqDebugFlag
=
131
;
int32_t
fsDebugFlag
=
135
;
int32_t
ctgDebugFlag
=
131
;
int64_t
dbgEmptyW
=
0
;
int64_t
dbgWN
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录