Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2164b8ef
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
2164b8ef
编写于
12月 10, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-10431 create stable
上级
90512f6e
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
328 addition
and
195 deletion
+328
-195
include/common/taosmsg.h
include/common/taosmsg.h
+17
-14
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+1
-1
include/util/taoserror.h
include/util/taoserror.h
+12
-2
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+10
-10
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+3
-4
source/dnode/mnode/impl/inc/mndStable.h
source/dnode/mnode/impl/inc/mndStable.h
+2
-2
source/dnode/mnode/impl/src/mndShow.c
source/dnode/mnode/impl/src/mndShow.c
+1
-1
source/dnode/mnode/impl/src/mndStable.c
source/dnode/mnode/impl/src/mndStable.c
+271
-150
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+1
-1
source/libs/parser/inc/astGenerator.h
source/libs/parser/inc/astGenerator.h
+1
-1
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+2
-2
source/libs/parser/src/astGenerator.c
source/libs/parser/src/astGenerator.c
+1
-1
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+2
-2
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+2
-2
src/client/inc/tscParseLine.h
src/client/inc/tscParseLine.h
+1
-1
未找到文件。
include/common/taosmsg.h
浏览文件 @
2164b8ef
...
...
@@ -74,10 +74,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_DB, "compact-db" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_FUNCTION
,
"create-function"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_RETRIEVE_FUNCTION
,
"retrieve-function"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_FUNCTION
,
"drop-function"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_ST
ABLE
,
"create-stable
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_ST
ABLE
,
"alter-stable
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_ST
ABLE
,
"drop-stable
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ST
ABLE_VGROUP
,
"stable
-vgroup"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_ST
B
,
"create-stb
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_ST
B
,
"alter-stb
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_ST
B
,
"drop-stb
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ST
B_VGROUP
,
"stb
-vgroup"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_KILL_QUERY
,
"kill-query"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_KILL_STREAM
,
"kill-stream"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_KILL_CONN
,
"kill-conn"
)
...
...
@@ -94,9 +94,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message from vnode to dnode
// message from mnode to vnode
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_ST
ABLE_IN
,
"create-stable
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_ST
ABLE_IN
,
"alter-stable
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_ST
ABLE_IN
,
"drop-stable
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_CREATE_ST
B_IN
,
"create-stb-in
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_ALTER_ST
B_IN
,
"alter-stb-in
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DROP_ST
B_IN
,
"drop-stb-in
"
)
// message from mnode to mnode
// message from mnode to qnode
// message from mnode to dnode
...
...
@@ -159,7 +159,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_DNODE
,
TSDB_MGMT_TABLE_MNODE
,
TSDB_MGMT_TABLE_VGROUP
,
TSDB_MGMT_TABLE_ST
ABLE
,
TSDB_MGMT_TABLE_ST
B
,
TSDB_MGMT_TABLE_MODULE
,
TSDB_MGMT_TABLE_QUERIES
,
TSDB_MGMT_TABLE_STREAMS
,
...
...
@@ -294,7 +294,7 @@ typedef struct {
uint64_t
superTableUid
;
uint64_t
createdTime
;
char
tableFname
[
TSDB_TABLE_FNAME_LEN
];
char
st
able
Fname
[
TSDB_TABLE_FNAME_LEN
];
char
st
b
Fname
[
TSDB_TABLE_FNAME_LEN
];
char
data
[];
}
SMDCreateTableMsg
;
...
...
@@ -311,9 +311,12 @@ typedef struct {
}
SCreateTableMsg
;
typedef
struct
{
int32_t
numOfTables
;
int32_t
contLen
;
}
SCMCreateTableMsg
;
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igExists
;
int32_t
numOfTags
;
int32_t
numOfColumns
;
SSchema
pSchema
[];
}
SCreateStbMsg
;
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
...
...
@@ -765,7 +768,7 @@ typedef struct {
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
}
SSt
able
InfoMsg
;
}
SSt
b
InfoMsg
;
typedef
struct
{
char
tableFname
[
TSDB_TABLE_FNAME_LEN
];
...
...
@@ -798,7 +801,7 @@ typedef struct {
typedef
struct
{
char
tableFname
[
TSDB_TABLE_FNAME_LEN
];
// table id
char
st
able
Fname
[
TSDB_TABLE_FNAME_LEN
];
char
st
b
Fname
[
TSDB_TABLE_FNAME_LEN
];
int32_t
numOfTags
;
int32_t
numOfColumns
;
int8_t
precision
;
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
2164b8ef
...
...
@@ -159,7 +159,7 @@ typedef enum {
SDB_AUTH
=
6
,
SDB_ACCT
=
7
,
SDB_VGROUP
=
9
,
SDB_ST
ABLE
=
9
,
SDB_ST
B
=
9
,
SDB_DB
=
10
,
SDB_FUNC
=
11
,
SDB_MAX
=
12
...
...
include/util/taoserror.h
浏览文件 @
2164b8ef
...
...
@@ -183,8 +183,18 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0348) //"Mnode already exists")
#define TSDB_CODE_MND_MNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0349) //"Mnode not there")
// mnode-table
#define TSDB_CODE_MND_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360) //"Table already exists")
// mnode-stable
#define TSDB_CODE_MND_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_IGEXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COLS_NUM TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COL_TYPE TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COL_ID TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COL_BYTES TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_STB_INVALID_COL_NAME TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0361) //"Table name too long")
#define TSDB_CODE_MND_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0362) //"Table does not exist")
#define TSDB_CODE_MND_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0363) //"Invalid table type in tsdb")
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
2164b8ef
...
...
@@ -69,10 +69,10 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_FUNCTION
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_RETRIEVE_FUNCTION
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_DROP_FUNCTION
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_ST
ABLE
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ALTER_ST
ABLE
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_DROP_ST
ABLE
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ST
ABLE
_VGROUP
]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_ST
B
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ALTER_ST
B
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_DROP_ST
B
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ST
B
_VGROUP
]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_KILL_QUERY
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_KILL_STREAM
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_KILL_CONN
]
=
dndProcessMnodeWriteMsg
;
...
...
@@ -84,12 +84,12 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_NETWORK_TEST
]
=
dndProcessDnodeReq
;
// message from mnode to vnode
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_ST
ABLE
_IN
]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_ST
ABLE
_IN_RSP
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ALTER_ST
ABLE
_IN
]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ALTER_ST
ABLE
_IN_RSP
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_DROP_ST
ABLE
_IN
]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_DROP_ST
ABLE
_IN_RSP
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_ST
B
_IN
]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_ST
B
_IN_RSP
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ALTER_ST
B
_IN
]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_ALTER_ST
B
_IN_RSP
]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_DROP_ST
B
_IN
]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_DROP_ST
B
_IN_RSP
]
=
dndProcessMnodeWriteMsg
;
// message from mnode to dnode
pMgmt
->
msgFp
[
TSDB_MSG_TYPE_CREATE_VNODE_IN
]
=
dndProcessVnodeMgmtMsg
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
2164b8ef
...
...
@@ -241,7 +241,7 @@ typedef struct SVgObj {
SVnodeGid
vnodeGid
[
TSDB_MAX_REPLICA
];
}
SVgObj
;
typedef
struct
SStableObj
{
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int64_t
createdTime
;
...
...
@@ -251,9 +251,8 @@ typedef struct SStableObj {
int32_t
numOfColumns
;
int32_t
numOfTags
;
SRWLatch
lock
;
SSchema
*
columnSchema
;
SSchema
*
tagSchema
;
}
SStableObj
;
SSchema
*
pSchema
;
}
SStbObj
;
typedef
struct
SFuncObj
{
char
name
[
TSDB_FUNC_NAME_LEN
];
...
...
source/dnode/mnode/impl/inc/mndStable.h
浏览文件 @
2164b8ef
...
...
@@ -22,8 +22,8 @@
extern
"C"
{
#endif
int32_t
mndInitSt
able
(
SMnode
*
pMnode
);
void
mndCleanupSt
able
(
SMnode
*
pMnode
);
int32_t
mndInitSt
b
(
SMnode
*
pMnode
);
void
mndCleanupSt
b
(
SMnode
*
pMnode
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndShow.c
浏览文件 @
2164b8ef
...
...
@@ -270,7 +270,7 @@ char *mndShowStr(int32_t showType) {
return
"show mnodes"
;
case
TSDB_MGMT_TABLE_VGROUP
:
return
"show vgroups"
;
case
TSDB_MGMT_TABLE_ST
ABLE
:
case
TSDB_MGMT_TABLE_ST
B
:
return
"show stables"
;
case
TSDB_MGMT_TABLE_MODULE
:
return
"show modules"
;
...
...
source/dnode/mnode/impl/src/mndStable.c
浏览文件 @
2164b8ef
...
...
@@ -15,62 +15,62 @@
#define _DEFAULT_SOURCE
#include "mndStable.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndDb.h"
#include "tname.h"
#define TSDB_ST
ABLE
_VER_NUM 1
#define TSDB_ST
ABLE
_RESERVE_SIZE 64
static
SSdbRaw
*
mndSt
ableActionEncode
(
SStable
Obj
*
pStb
);
static
SSdbRow
*
mndSt
able
ActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndSt
ableActionInsert
(
SSdb
*
pSdb
,
SStable
Obj
*
pStb
);
static
int32_t
mndSt
ableActionDelete
(
SSdb
*
pSdb
,
SStable
Obj
*
pStb
);
static
int32_t
mndSt
ableActionUpdate
(
SSdb
*
pSdb
,
SStableObj
*
pOldStb
,
SStable
Obj
*
pNewStb
);
static
int32_t
mndProcessCreateSt
able
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterSt
able
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropSt
able
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateSt
able
InRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterSt
able
InRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropSt
able
InRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSt
able
MetaMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetSt
able
Meta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
static
int32_t
mndRetrieveSt
ables
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextSt
able
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitSt
able
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_ST
ABLE
,
#define TSDB_ST
B
_VER_NUM 1
#define TSDB_ST
B
_RESERVE_SIZE 64
static
SSdbRaw
*
mndSt
bActionEncode
(
SStb
Obj
*
pStb
);
static
SSdbRow
*
mndSt
b
ActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndSt
bActionInsert
(
SSdb
*
pSdb
,
SStb
Obj
*
pStb
);
static
int32_t
mndSt
bActionDelete
(
SSdb
*
pSdb
,
SStb
Obj
*
pStb
);
static
int32_t
mndSt
bActionUpdate
(
SSdb
*
pSdb
,
SStbObj
*
pOldStb
,
SStb
Obj
*
pNewStb
);
static
int32_t
mndProcessCreateSt
b
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterSt
b
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropSt
b
Msg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateSt
b
InRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterSt
b
InRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropSt
b
InRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessSt
b
MetaMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetSt
b
Meta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
static
int32_t
mndRetrieveSt
b
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextSt
b
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitSt
b
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_ST
B
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSt
able
ActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSt
able
ActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSt
able
ActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSt
able
ActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSt
able
ActionDelete
};
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_ST
ABLE
,
mndProcessCreateStable
Msg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_ALTER_ST
ABLE
,
mndProcessAlterStable
Msg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_ST
ABLE
,
mndProcessDropStable
Msg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_ST
ABLE_IN_RSP
,
mndProcessCreateStable
InRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_ALTER_ST
ABLE_IN_RSP
,
mndProcessAlterStable
InRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_ST
ABLE_IN_RSP
,
mndProcessDropStable
InRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_TABLE_META
,
mndProcessSt
able
MetaMsg
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_ST
ABLE
,
mndGetStable
Meta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_ST
ABLE
,
mndRetrieveStables
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_ST
ABLE
,
mndCancelGetNextStable
);
.
encodeFp
=
(
SdbEncodeFp
)
mndSt
b
ActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSt
b
ActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSt
b
ActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSt
b
ActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSt
b
ActionDelete
};
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_ST
B
,
mndProcessCreateStb
Msg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_ALTER_ST
B
,
mndProcessAlterStb
Msg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_ST
B
,
mndProcessDropStb
Msg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_ST
B_IN_RSP
,
mndProcessCreateStb
InRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_ALTER_ST
B_IN_RSP
,
mndProcessAlterStb
InRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_ST
B_IN_RSP
,
mndProcessDropStb
InRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_TABLE_META
,
mndProcessSt
b
MetaMsg
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_ST
B
,
mndGetStb
Meta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_ST
B
,
mndRetrieveStb
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_ST
B
,
mndCancelGetNextStb
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupSt
able
(
SMnode
*
pMnode
)
{}
void
mndCleanupSt
b
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndSt
ableActionEncode
(
SStable
Obj
*
pStb
)
{
int32_t
size
=
sizeof
(
SSt
able
Obj
)
+
(
pStb
->
numOfColumns
+
pStb
->
numOfTags
)
*
sizeof
(
SSchema
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_ST
ABLE
,
TSDB_STABLE
_VER_NUM
,
size
);
static
SSdbRaw
*
mndSt
bActionEncode
(
SStb
Obj
*
pStb
)
{
int32_t
size
=
sizeof
(
SSt
b
Obj
)
+
(
pStb
->
numOfColumns
+
pStb
->
numOfTags
)
*
sizeof
(
SSchema
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_ST
B
,
TSDB_STB
_VER_NUM
,
size
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
...
...
@@ -82,41 +82,34 @@ static SSdbRaw *mndStableActionEncode(SStableObj *pStb) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfColumns
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfTags
)
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfColumns
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
columnSchema
[
i
];
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSchema
->
type
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pSchema
->
colId
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pSchema
->
bytes
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
);
}
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfTags
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
tagSchema
[
i
];
int32_t
totalCols
=
pStb
->
numOfColumns
+
pStb
->
numOfTags
;
for
(
int32_t
i
=
0
;
i
<
totalCols
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
pSchema
[
i
];
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSchema
->
type
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pSchema
->
colId
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pSchema
->
bytes
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
);
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_ST
ABLE
_RESERVE_SIZE
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_ST
B
_RESERVE_SIZE
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
return
pRaw
;
}
static
SSdbRow
*
mndSt
able
ActionDecode
(
SSdbRaw
*
pRaw
)
{
static
SSdbRow
*
mndSt
b
ActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB_ST
ABLE
_VER_NUM
)
{
if
(
sver
!=
TSDB_ST
B
_VER_NUM
)
{
mError
(
"failed to decode stable since %s"
,
terrstr
());
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
return
NULL
;
}
int32_t
size
=
sizeof
(
SStable
Obj
)
+
TSDB_MAX_COLUMNS
*
sizeof
(
SSchema
);
SSdbRow
*
pRow
=
sdbAllocRow
(
size
);
SSt
able
Obj
*
pStb
=
sdbGetRowObj
(
pRow
);
int32_t
size
=
sizeof
(
SStb
Obj
)
+
TSDB_MAX_COLUMNS
*
sizeof
(
SSchema
);
SSdbRow
*
pRow
=
sdbAllocRow
(
size
);
SSt
b
Obj
*
pStb
=
sdbGetRowObj
(
pRow
);
if
(
pStb
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
...
...
@@ -128,114 +121,251 @@ static SSdbRow *mndStableActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pStb
->
numOfColumns
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pStb
->
numOfTags
)
pStb
->
columnSchema
=
calloc
(
pStb
->
numOfColumns
,
sizeof
(
SSchema
))
;
pStb
->
tagSchema
=
calloc
(
pStb
->
numOfTag
s
,
sizeof
(
SSchema
));
int32_t
totalCols
=
pStb
->
numOfColumns
+
pStb
->
numOfTags
;
pStb
->
pSchema
=
calloc
(
totalCol
s
,
sizeof
(
SSchema
));
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfColumn
s
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
column
Schema
[
i
];
for
(
int32_t
i
=
0
;
i
<
totalCol
s
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
p
Schema
[
i
];
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
pSchema
->
type
);
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pSchema
->
colId
);
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pSchema
->
bytes
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
);
}
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfTags
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
tagSchema
[
i
];
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
pSchema
->
type
);
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pSchema
->
colId
);
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pSchema
->
bytes
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
);
}
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_STABLE_RESERVE_SIZE
)
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_STB_RESERVE_SIZE
)
return
pRow
;
}
static
int32_t
mndSt
ableActionInsert
(
SSdb
*
pSdb
,
SStable
Obj
*
pStb
)
{
mTrace
(
"st
able
:%s, perform insert action"
,
pStb
->
name
);
static
int32_t
mndSt
bActionInsert
(
SSdb
*
pSdb
,
SStb
Obj
*
pStb
)
{
mTrace
(
"st
b
:%s, perform insert action"
,
pStb
->
name
);
return
0
;
}
static
int32_t
mndSt
ableActionDelete
(
SSdb
*
pSdb
,
SStable
Obj
*
pStb
)
{
mTrace
(
"st
able
:%s, perform delete action"
,
pStb
->
name
);
static
int32_t
mndSt
bActionDelete
(
SSdb
*
pSdb
,
SStb
Obj
*
pStb
)
{
mTrace
(
"st
b
:%s, perform delete action"
,
pStb
->
name
);
return
0
;
}
static
int32_t
mndSt
ableActionUpdate
(
SSdb
*
pSdb
,
SStableObj
*
pOldStb
,
SStable
Obj
*
pNewStb
)
{
mTrace
(
"st
able
:%s, perform update action"
,
pOldStb
->
name
);
static
int32_t
mndSt
bActionUpdate
(
SSdb
*
pSdb
,
SStbObj
*
pOldStb
,
SStb
Obj
*
pNewStb
)
{
mTrace
(
"st
b
:%s, perform update action"
,
pOldStb
->
name
);
atomic_exchange_32
(
&
pOldStb
->
updateTime
,
pNewStb
->
updateTime
);
atomic_exchange_32
(
&
pOldStb
->
version
,
pNewStb
->
version
);
taosWLockLatch
(
&
pOldStb
->
lock
);
int32_t
numOfTags
=
pNewStb
->
numOfTags
;
int32_t
tagSize
=
numOfTags
*
sizeof
(
SSchema
);
int32_t
numOfColumns
=
pNewStb
->
numOfColumns
;
int32_t
columnSize
=
numOfColumns
*
sizeof
(
SSchema
);
int32_t
totalCols
=
pNewStb
->
numOfTags
+
pNewStb
->
numOfColumns
;
int32_t
totalSize
=
totalCols
*
sizeof
(
SSchema
);
if
(
pOldStb
->
numOfTags
<
numOfTags
)
{
pOldStb
->
tagSchema
=
malloc
(
tagSize
);
}
if
(
pOldStb
->
numOfColumns
<
numOfColumns
)
{
pOldStb
->
columnSchema
=
malloc
(
columnSize
);
if
(
pOldStb
->
numOfTags
+
pOldStb
->
numOfColumns
<
totalCols
)
{
pOldStb
->
pSchema
=
malloc
(
totalSize
);
}
memcpy
(
pOldStb
->
tagSchema
,
pNewStb
->
tagSchema
,
tagSize
);
memcpy
(
pOldStb
->
columnSchema
,
pNewStb
->
columnSchema
,
columnSize
);
memcpy
(
pOldStb
->
pSchema
,
pNewStb
->
pSchema
,
totalSize
);
taosWUnLockLatch
(
&
pOldStb
->
lock
);
return
0
;
}
SSt
able
Obj
*
mndAcquireStb
(
SMnode
*
pMnode
,
char
*
stbName
)
{
SSt
b
Obj
*
mndAcquireStb
(
SMnode
*
pMnode
,
char
*
stbName
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_ST
ABLE
,
stbName
);
return
sdbAcquire
(
pSdb
,
SDB_ST
B
,
stbName
);
}
void
mndReleaseStb
(
SMnode
*
pMnode
,
SSt
able
Obj
*
pStb
)
{
void
mndReleaseStb
(
SMnode
*
pMnode
,
SSt
b
Obj
*
pStb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pStb
);
}
static
int32_t
mndProcessCreateStableMsg
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
SDbObj
*
mndAcquireDbByStb
(
SMnode
*
pMnode
,
char
*
stbName
)
{
SName
name
=
{
0
};
tNameFromString
(
&
name
,
stbName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
static
int32_t
mndProcessCreateStableInRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
char
db
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
name
,
db
);
static
int32_t
mndProcessAlterStableMsg
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
return
mndAcquireDb
(
pMnode
,
db
);
}
static
int32_t
mndProcessAlterStableInRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndCheckStbMsg
(
SCreateStbMsg
*
pCreate
)
{
pCreate
->
numOfColumns
=
htonl
(
pCreate
->
numOfColumns
);
pCreate
->
numOfTags
=
htonl
(
pCreate
->
numOfTags
);
int32_t
totalCols
=
pCreate
->
numOfColumns
+
pCreate
->
numOfTags
;
for
(
int32_t
i
=
0
;
i
<
totalCols
;
++
i
)
{
SSchema
*
pSchema
=
&
pCreate
->
pSchema
[
i
];
pSchema
->
colId
=
htonl
(
pSchema
->
colId
);
pSchema
->
bytes
=
htonl
(
pSchema
->
bytes
);
}
static
int32_t
mndProcessDropStableMsg
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
if
(
pCreate
->
igExists
<
0
||
pCreate
->
igExists
>
1
)
{
terrno
=
TSDB_CODE_MND_STB_INVALID_IGEXIST
;
return
-
1
;
}
static
int32_t
mndProcessDropStableInRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
if
(
pCreate
->
numOfColumns
<
TSDB_MIN_COLUMNS
||
pCreate
->
numOfColumns
>
TSDB_MAX_COLUMNS
)
{
terrno
=
TSDB_CODE_MND_STB_INVALID_COLS_NUM
;
return
-
1
;
}
static
SDbObj
*
mndGetDbByStbName
(
SMnode
*
pMnode
,
char
*
stbName
)
{
SName
name
=
{
0
};
tNameFromString
(
&
name
,
stbName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
if
(
pCreate
->
numOfTags
<=
0
||
pCreate
->
numOfTags
>
TSDB_MAX_TAGS
)
{
terrno
=
TSDB_CODE_MND_STB_INVALID_TAGS_NUM
;
return
-
1
;
}
char
db
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
name
,
db
);
int32_t
maxColId
=
(
TSDB_MAX_COLUMNS
+
TSDB_MAX_TAGS
);
for
(
int32_t
i
=
0
;
i
<
totalCols
;
++
i
)
{
SSchema
*
pSchema
=
&
pCreate
->
pSchema
[
i
];
if
(
pSchema
->
type
<=
0
)
{
terrno
=
TSDB_CODE_MND_STB_INVALID_COL_TYPE
;
return
-
1
;
}
if
(
pSchema
->
colId
<
0
||
pSchema
->
colId
>=
maxColId
)
{
terrno
=
TSDB_CODE_MND_STB_INVALID_COL_ID
;
return
-
1
;
}
if
(
pSchema
->
bytes
<=
0
)
{
terrno
=
TSDB_CODE_MND_STB_INVALID_COL_BYTES
;
return
-
1
;
}
if
(
pSchema
->
name
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_STB_INVALID_COL_NAME
;
return
-
1
;
}
}
return
mndAcquireDb
(
pMnode
,
db
);
return
0
;
}
static
int32_t
mndCreateStb
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SCreateStbMsg
*
pCreate
,
SDbObj
*
pDb
)
{
SStbObj
stbObj
=
{
0
};
tstrncpy
(
stbObj
.
name
,
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
tstrncpy
(
stbObj
.
db
,
pDb
->
name
,
TSDB_FULL_DB_NAME_LEN
);
stbObj
.
createdTime
=
taosGetTimestampMs
();
stbObj
.
updateTime
=
stbObj
.
createdTime
;
stbObj
.
uid
=
1234
;
stbObj
.
version
=
1
;
stbObj
.
numOfColumns
=
pCreate
->
numOfColumns
;
stbObj
.
numOfTags
=
pCreate
->
numOfTags
;
int32_t
totalCols
=
stbObj
.
numOfColumns
+
stbObj
.
numOfTags
;
int32_t
totalSize
=
totalCols
*
sizeof
(
SSchema
);
stbObj
.
pSchema
=
malloc
(
totalSize
);
if
(
stbObj
.
pSchema
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
stbObj
.
pSchema
,
pCreate
->
pSchema
,
totalSize
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
if
(
pTrans
==
NULL
)
{
mError
(
"stb:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to create stb:%s"
,
pTrans
->
id
,
pCreate
->
name
);
SSdbRaw
*
pRedoRaw
=
mndStbActionEncode
(
&
stbObj
);
if
(
pRedoRaw
==
NULL
||
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append redo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_CREATING
);
SSdbRaw
*
pUndoRaw
=
mndStbActionEncode
(
&
stbObj
);
if
(
pUndoRaw
==
NULL
||
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append undo log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_DROPPED
);
SSdbRaw
*
pCommitRaw
=
mndStbActionEncode
(
&
stbObj
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
mError
(
"trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
mndTransPrepare
(
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
mndTransDrop
(
pTrans
);
return
0
;
}
static
int32_t
mndProcess
StableMeta
Msg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
S
StableInfoMsg
*
pInfo
=
pMsg
->
rpcMsg
.
pCont
;
static
int32_t
mndProcess
CreateStb
Msg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
S
CreateStbMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"st
able:%s, start to retrieve meta"
,
pInfo
->
name
);
mDebug
(
"st
b:%s, start to create"
,
pCreate
->
name
);
SDbObj
*
pDb
=
mndGetDbByStbName
(
pMnode
,
pInfo
->
name
);
if
(
mndCheckStbMsg
(
pCreate
)
!=
0
)
{
mError
(
"stb:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
SStbObj
*
pStb
=
mndAcquireStb
(
pMnode
,
pCreate
->
name
);
if
(
pStb
!=
NULL
)
{
sdbRelease
(
pMnode
->
pSdb
,
pStb
);
if
(
pCreate
->
igExists
)
{
mDebug
(
"stb:%s, already exist, ignore exist is set"
,
pCreate
->
name
);
return
0
;
}
else
{
terrno
=
TSDB_CODE_MND_STB_ALREADY_EXIST
;
mError
(
"db:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
}
SDbObj
*
pDb
=
mndAcquireDbByStb
(
pMnode
,
pCreate
->
name
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
mError
(
"stb:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
int32_t
code
=
mndCreateStb
(
pMnode
,
pMsg
,
pCreate
,
pDb
);
mndReleaseDb
(
pMnode
,
pDb
);
if
(
code
!=
0
)
{
terrno
=
code
;
mError
(
"stb:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessCreateStbInRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessAlterStbMsg
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessAlterStbInRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessDropStbMsg
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessDropStbInRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessStbMetaMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SStbInfoMsg
*
pInfo
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"stb:%s, start to retrieve meta"
,
pInfo
->
name
);
SDbObj
*
pDb
=
mndAcquireDbByStb
(
pMnode
,
pInfo
->
name
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
mError
(
"st
able
:%s, failed to retrieve meta since %s"
,
pInfo
->
name
,
terrstr
());
mError
(
"st
b
:%s, failed to retrieve meta since %s"
,
pInfo
->
name
,
terrstr
());
return
-
1
;
}
SSt
able
Obj
*
pStb
=
mndAcquireStb
(
pMnode
,
pInfo
->
name
);
SSt
b
Obj
*
pStb
=
mndAcquireStb
(
pMnode
,
pInfo
->
name
);
if
(
pStb
==
NULL
)
{
mndReleaseDb
(
pMnode
,
pDb
);
terrno
=
TSDB_CODE_MND_INVALID_TABLE_NAME
;
mError
(
"st
able
:%s, failed to get meta since %s"
,
pInfo
->
name
,
terrstr
());
mError
(
"st
b
:%s, failed to get meta since %s"
,
pInfo
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -243,11 +373,11 @@ static int32_t mndProcessStableMetaMsg(SMnodeMsg *pMsg) {
STableMetaMsg
*
pMeta
=
rpcMallocCont
(
contLen
);
if
(
pMeta
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"st
able
:%s, failed to get meta since %s"
,
pInfo
->
name
,
terrstr
());
mError
(
"st
b
:%s, failed to get meta since %s"
,
pInfo
->
name
,
terrstr
());
return
-
1
;
}
memcpy
(
pMeta
->
st
able
Fname
,
pStb
->
name
,
TSDB_TABLE_FNAME_LEN
);
memcpy
(
pMeta
->
st
b
Fname
,
pStb
->
name
,
TSDB_TABLE_FNAME_LEN
);
pMeta
->
numOfTags
=
htonl
(
pStb
->
numOfTags
);
pMeta
->
numOfColumns
=
htonl
(
pStb
->
numOfColumns
);
pMeta
->
precision
=
pDb
->
cfg
.
precision
;
...
...
@@ -258,30 +388,21 @@ static int32_t mndProcessStableMetaMsg(SMnodeMsg *pMsg) {
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfColumns
;
++
i
)
{
SSchema
*
pSchema
=
&
pMeta
->
pSchema
[
i
];
SSchema
*
pColumn
=
&
pStb
->
columnSchema
[
i
];
memcpy
(
pSchema
->
name
,
pColumn
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pColumn
->
type
;
pSchema
->
colId
=
htonl
(
pColumn
->
colId
);
pSchema
->
bytes
=
htonl
(
pColumn
->
bytes
);
}
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfTags
;
++
i
)
{
SSchema
*
pSchema
=
&
pMeta
->
pSchema
[
i
+
pStb
->
numOfColumns
];
SSchema
*
pTag
=
&
pStb
->
tagSchema
[
i
];
memcpy
(
pSchema
->
name
,
pTag
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pTag
->
type
;
pSchema
->
colId
=
htons
(
pTag
->
colId
);
pSchema
->
bytes
=
htonl
(
pTag
->
bytes
);
SSchema
*
pSrcSchema
=
&
pStb
->
pSchema
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
colId
=
htonl
(
pSrcSchema
->
colId
);
pSchema
->
bytes
=
htonl
(
pSrcSchema
->
bytes
);
}
pMsg
->
pCont
=
pMeta
;
pMsg
->
contLen
=
contLen
;
mDebug
(
"st
able
:%s, meta is retrieved, cols:%d tags:%d"
,
pInfo
->
name
,
pStb
->
numOfColumns
,
pStb
->
numOfTags
);
mDebug
(
"st
b
:%s, meta is retrieved, cols:%d tags:%d"
,
pInfo
->
name
,
pStb
->
numOfColumns
,
pStb
->
numOfTags
);
return
0
;
}
static
int32_t
mndGetNumOfSt
ables
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfStable
s
)
{
static
int32_t
mndGetNumOfSt
bs
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfStb
s
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
dbName
);
...
...
@@ -290,29 +411,29 @@ static int32_t mndGetNumOfStables(SMnode *pMnode, char *dbName, int32_t *pNumOfS
return
-
1
;
}
int32_t
numOfSt
able
s
=
0
;
int32_t
numOfSt
b
s
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SSt
able
Obj
*
pStb
=
NULL
;
SSt
b
Obj
*
pStb
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pStb
);
if
(
pIter
==
NULL
)
break
;
if
(
strcmp
(
pStb
->
db
,
dbName
)
==
0
)
{
numOfSt
able
s
++
;
numOfSt
b
s
++
;
}
sdbRelease
(
pSdb
,
pStb
);
}
*
pNumOfSt
ables
=
numOfStable
s
;
*
pNumOfSt
bs
=
numOfStb
s
;
return
0
;
}
static
int32_t
mndGetSt
able
Meta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
)
{
static
int32_t
mndGetSt
b
Meta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
if
(
mndGetNumOfSt
able
s
(
pMnode
,
pShow
->
db
,
&
pShow
->
numOfRows
)
!=
0
)
{
if
(
mndGetNumOfSt
b
s
(
pMnode
,
pShow
->
db
,
&
pShow
->
numOfRows
)
!=
0
)
{
return
-
1
;
}
...
...
@@ -357,7 +478,7 @@ static int32_t mndGetStableMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
return
0
;
}
static
void
mnodeExtractTableName
(
char
*
tableId
,
char
*
name
)
{
static
void
mnodeExtractTableName
(
char
*
tableId
,
char
*
name
)
{
int
pos
=
-
1
;
int
num
=
0
;
for
(
pos
=
0
;
tableId
[
pos
]
!=
0
;
++
pos
)
{
...
...
@@ -370,21 +491,21 @@ static void mnodeExtractTableName(char* tableId, char* name) {
}
}
static
int32_t
mndRetrieveSt
ables
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SSt
able
Obj
*
pStb
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
prefix
[
64
]
=
{
0
};
static
int32_t
mndRetrieveSt
b
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SSt
b
Obj
*
pStb
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
prefix
[
64
]
=
{
0
};
tstrncpy
(
prefix
,
pShow
->
db
,
64
);
strcat
(
prefix
,
TS_PATH_DELIMITER
);
int32_t
prefixLen
=
(
int32_t
)
strlen
(
prefix
);
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_ST
ABLE
,
pShow
->
pIter
,
(
void
**
)
&
pStb
);
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_ST
B
,
pShow
->
pIter
,
(
void
**
)
&
pStb
);
if
(
pShow
->
pIter
==
NULL
)
break
;
if
(
strncmp
(
pStb
->
name
,
prefix
,
prefixLen
)
!=
0
)
{
...
...
@@ -394,10 +515,10 @@ static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
cols
=
0
;
char
st
able
Name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
memcpy
(
st
able
Name
,
pStb
->
name
+
prefixLen
,
TSDB_TABLE_FNAME_LEN
-
prefixLen
);
char
st
b
Name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
memcpy
(
st
b
Name
,
pStb
->
name
+
prefixLen
,
TSDB_TABLE_FNAME_LEN
-
prefixLen
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_TO_VARSTR
(
pWrite
,
st
able
Name
);
STR_TO_VARSTR
(
pWrite
,
st
b
Name
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
@@ -421,7 +542,7 @@ static int32_t mndRetrieveStables(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
return
numOfRows
;
}
static
void
mndCancelGetNextSt
able
(
SMnode
*
pMnode
,
void
*
pIter
)
{
static
void
mndCancelGetNextSt
b
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
\ No newline at end of file
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
2164b8ef
...
...
@@ -131,7 +131,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if
(
mndAllocStep
(
pMnode
,
"mnode-user"
,
mndInitUser
,
mndCleanupUser
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-db"
,
mndInitDb
,
mndCleanupDb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-vgroup"
,
mndInitVgroup
,
mndCleanupVgroup
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-st
able"
,
mndInitStable
,
mndCleanupStable
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-st
b"
,
mndInitStb
,
mndCleanupStb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-func"
,
mndInitFunc
,
mndCleanupFunc
)
!=
0
)
return
-
1
;
if
(
pMnode
->
clusterId
<=
0
)
{
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb-deploy"
,
mndDeploySdb
,
NULL
)
!=
0
)
return
-
1
;
...
...
source/libs/parser/inc/astGenerator.h
浏览文件 @
2164b8ef
...
...
@@ -121,7 +121,7 @@ typedef struct SRelationInfo {
typedef
struct
SCreatedTableInfo
{
SToken
name
;
// table name token
SToken
st
ableName
;
// super table name token , for using clause
SToken
st
bName
;
// super table name token , for using clause
SArray
*
pTagNames
;
// create by using super table, tag name
SArray
*
pTagVals
;
// create by using super table, tag value
char
*
fullname
;
// table full name
...
...
source/libs/parser/inc/sql.y
浏览文件 @
2164b8ef
...
...
@@ -112,13 +112,13 @@ cmd ::= SHOW dbPrefix(X) TABLES LIKE ids(Y). {
}
cmd ::= SHOW dbPrefix(X) STABLES. {
setShowOptions(pInfo, TSDB_MGMT_TABLE_ST
ABLE
, &X, 0);
setShowOptions(pInfo, TSDB_MGMT_TABLE_ST
B
, &X, 0);
}
cmd ::= SHOW dbPrefix(X) STABLES LIKE ids(Y). {
SToken token;
tSetDbName(&token, &X);
setShowOptions(pInfo, TSDB_MGMT_TABLE_ST
ABLE
, &token, &Y);
setShowOptions(pInfo, TSDB_MGMT_TABLE_ST
B
, &token, &Y);
}
cmd ::= SHOW dbPrefix(X) VGROUPS. {
...
...
source/libs/parser/src/astGenerator.c
浏览文件 @
2164b8ef
...
...
@@ -634,7 +634,7 @@ SCreatedTableInfo createNewChildTableInfo(SToken *pTableName, SArray *pTagNames,
info
.
name
=
*
pToken
;
info
.
pTagNames
=
pTagNames
;
info
.
pTagVals
=
pTagVals
;
info
.
st
ableName
=
*
pTableName
;
info
.
st
bName
=
*
pTableName
;
info
.
igExist
=
(
igExists
->
n
>
0
)
?
1
:
0
;
return
info
;
...
...
source/libs/parser/src/sql.c
浏览文件 @
2164b8ef
...
...
@@ -2312,14 +2312,14 @@ static void yy_reduce(
break
;
case
26
:
/* cmd ::= SHOW dbPrefix STABLES */
{
setShowOptions
(
pInfo
,
TSDB_MGMT_TABLE_ST
ABLE
,
&
yymsp
[
-
1
].
minor
.
yy0
,
0
);
setShowOptions
(
pInfo
,
TSDB_MGMT_TABLE_ST
B
,
&
yymsp
[
-
1
].
minor
.
yy0
,
0
);
}
break
;
case
27
:
/* cmd ::= SHOW dbPrefix STABLES LIKE ids */
{
SToken
token
;
tSetDbName
(
&
token
,
&
yymsp
[
-
3
].
minor
.
yy0
);
setShowOptions
(
pInfo
,
TSDB_MGMT_TABLE_ST
ABLE
,
&
token
,
&
yymsp
[
0
].
minor
.
yy0
);
setShowOptions
(
pInfo
,
TSDB_MGMT_TABLE_ST
B
,
&
token
,
&
yymsp
[
0
].
minor
.
yy0
);
}
break
;
case
28
:
/* cmd ::= SHOW dbPrefix VGROUPS */
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
2164b8ef
...
...
@@ -406,7 +406,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t
// for TDengine, all the query, show commands shall have TCP connection
char
type
=
pMsg
->
msgType
;
if
(
type
==
TSDB_MSG_TYPE_QUERY
||
type
==
TSDB_MSG_TYPE_SHOW_RETRIEVE
||
type
==
TSDB_MSG_TYPE_FETCH
||
type
==
TSDB_MSG_TYPE_ST
ABLE
_VGROUP
||
type
==
TSDB_MSG_TYPE_FETCH
||
type
==
TSDB_MSG_TYPE_ST
B
_VGROUP
||
type
==
TSDB_MSG_TYPE_TABLES_META
||
type
==
TSDB_MSG_TYPE_TABLE_META
||
type
==
TSDB_MSG_TYPE_SHOW
||
type
==
TSDB_MSG_TYPE_STATUS
||
type
==
TSDB_MSG_TYPE_ALTER_TABLE
)
pContext
->
connType
=
RPC_CONN_TCPC
;
...
...
source/util/src/terror.c
浏览文件 @
2164b8ef
...
...
@@ -193,8 +193,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_USERS, "Too many users")
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MNODE_ALREADY_EXIST
,
"Mnode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MNODE_NOT_EXIST
,
"Mnode not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_
TABLE_ALREADY_EXIST
,
"T
able already exists"
)
// mnode-stable
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_
STB_ALREADY_EXIST
,
"St
able already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_TABLE_ID
,
"Table name too long"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_TABLE_NAME
,
"Table does not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_TABLE_TYPE
,
"Invalid table type in tsdb"
)
...
...
src/client/inc/tscParseLine.h
浏览文件 @
2164b8ef
...
...
@@ -28,7 +28,7 @@ typedef struct {
}
TAOS_SML_KV
;
typedef
struct
{
char
*
st
able
Name
;
char
*
st
b
Name
;
char
*
childTableName
;
TAOS_SML_KV
*
tags
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录