Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bed4c349
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bed4c349
编写于
12月 24, 2021
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/vnode
上级
9acdf1e9
7205a9ce
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
1131 addition
and
72 deletion
+1131
-72
include/common/tmsg.h
include/common/tmsg.h
+37
-0
include/common/tname.h
include/common/tname.h
+4
-0
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+8
-5
include/util/taoserror.h
include/util/taoserror.h
+10
-0
include/util/tdef.h
include/util/tdef.h
+2
-0
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+1
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+15
-0
source/dnode/mnode/impl/inc/mndStb.h
source/dnode/mnode/impl/inc/mndStb.h
+3
-0
source/dnode/mnode/impl/inc/mndTopic.h
source/dnode/mnode/impl/inc/mndTopic.h
+35
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+119
-41
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+10
-1
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+857
-0
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+2
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+10
-2
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+4
-2
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+1
-0
tests/script/general/db/basic1.sim
tests/script/general/db/basic1.sim
+0
-1
tests/script/general/table/basic1.sim
tests/script/general/table/basic1.sim
+13
-19
未找到文件。
include/common/tmsg.h
浏览文件 @
bed4c349
...
@@ -1056,6 +1056,43 @@ typedef struct STaskDropRsp {
...
@@ -1056,6 +1056,43 @@ typedef struct STaskDropRsp {
int32_t
code
;
int32_t
code
;
}
STaskDropRsp
;
}
STaskDropRsp
;
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
int8_t
igExists
;
int32_t
execLen
;
void
*
executor
;
int32_t
sqlLen
;
char
*
sql
;
}
SCreateTopicMsg
;
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igNotExists
;
}
SDropTopicMsg
;
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
alterType
;
SSchema
schema
;
}
SAlterTopicMsg
;
typedef
struct
{
SMsgHead
head
;
char
name
[
TSDB_TABLE_FNAME_LEN
];
uint64_t
tuid
;
int32_t
sverson
;
int32_t
execLen
;
char
*
executor
;
int32_t
sqlLen
;
char
*
sql
;
}
SCreateTopicInternalMsg
;
typedef
struct
{
SMsgHead
head
;
char
name
[
TSDB_TABLE_FNAME_LEN
];
uint64_t
tuid
;
}
SDropTopicInternalMsg
;
#pragma pack(pop)
#pragma pack(pop)
#ifdef __cplusplus
#ifdef __cplusplus
...
...
include/common/tname.h
浏览文件 @
bed4c349
...
@@ -16,18 +16,22 @@
...
@@ -16,18 +16,22 @@
#ifndef TDENGINE_TNAME_H
#ifndef TDENGINE_TNAME_H
#define TDENGINE_TNAME_H
#define TDENGINE_TNAME_H
#include "tdef.h"
#define TSDB_DB_NAME_T 1
#define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2
#define TSDB_TABLE_NAME_T 2
#define T_NAME_ACCT 0x1u
#define T_NAME_ACCT 0x1u
#define T_NAME_DB 0x2u
#define T_NAME_DB 0x2u
#define T_NAME_TABLE 0x4u
#define T_NAME_TABLE 0x4u
#define T_NAME_TOPIC 0x8u
typedef
struct
SName
{
typedef
struct
SName
{
uint8_t
type
;
//db_name_t, table_name_t
uint8_t
type
;
//db_name_t, table_name_t
int32_t
acctId
;
int32_t
acctId
;
char
dbname
[
TSDB_DB_NAME_LEN
];
char
dbname
[
TSDB_DB_NAME_LEN
];
char
tname
[
TSDB_TABLE_NAME_LEN
];
char
tname
[
TSDB_TABLE_NAME_LEN
];
char
topicName
[
TSDB_TOPIC_NAME_LEN
];
}
SName
;
}
SName
;
int32_t
tNameExtractFullName
(
const
SName
*
name
,
char
*
dst
);
int32_t
tNameExtractFullName
(
const
SName
*
name
,
char
*
dst
);
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
bed4c349
...
@@ -16,6 +16,8 @@
...
@@ -16,6 +16,8 @@
#ifndef _TD_SDB_H_
#ifndef _TD_SDB_H_
#define _TD_SDB_H_
#define _TD_SDB_H_
#include "os.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
...
@@ -159,11 +161,12 @@ typedef enum {
...
@@ -159,11 +161,12 @@ typedef enum {
SDB_USER
=
5
,
SDB_USER
=
5
,
SDB_AUTH
=
6
,
SDB_AUTH
=
6
,
SDB_ACCT
=
7
,
SDB_ACCT
=
7
,
SDB_VGROUP
=
8
,
SDB_TOPIC
=
8
,
SDB_STB
=
9
,
SDB_VGROUP
=
9
,
SDB_DB
=
10
,
SDB_STB
=
10
,
SDB_FUNC
=
11
,
SDB_DB
=
11
,
SDB_MAX
=
12
SDB_FUNC
=
12
,
SDB_MAX
=
13
}
ESdbType
;
}
ESdbType
;
typedef
struct
SSdb
SSdb
;
typedef
struct
SSdb
SSdb
;
...
...
include/util/taoserror.h
浏览文件 @
bed4c349
...
@@ -220,6 +220,7 @@ int32_t* taosGetErrno();
...
@@ -220,6 +220,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_COLUMN_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA)
#define TSDB_CODE_MND_COLUMN_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA)
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB)
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB)
#define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
#define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AD)
// mnode-func
// mnode-func
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C0)
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C0)
...
@@ -234,6 +235,15 @@ int32_t* taosGetErrno();
...
@@ -234,6 +235,15 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
// mnode-topic
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
#define TSDB_CODE_MND_TOPIC_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E1)
#define TSDB_CODE_MND_TOO_MANY_TOPICS TAOS_DEF_ERROR_CODE(0, 0x03E2)
#define TSDB_CODE_MND_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E3)
#define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x03E4)
#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6)
// dnode
// dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401)
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401)
...
...
include/util/tdef.h
浏览文件 @
bed4c349
...
@@ -163,6 +163,7 @@ do { \
...
@@ -163,6 +163,7 @@ do { \
#define TSDB_NODE_NAME_LEN 64
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_NAME_LEN 65
#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN)
#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN)
...
@@ -175,6 +176,7 @@ do { \
...
@@ -175,6 +176,7 @@ do { \
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_FULL_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_TABLE_FNAME_LEN (TSDB_FULL_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_COL_NAME_LEN 65
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
bed4c349
...
@@ -829,6 +829,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
...
@@ -829,6 +829,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
int32_t
code
=
vnodeApplyWMsg
(
pVnode
->
pImpl
,
pMsg
,
&
pRsp
);
int32_t
code
=
vnodeApplyWMsg
(
pVnode
->
pImpl
,
pMsg
,
&
pRsp
);
if
(
pRsp
!=
NULL
)
{
if
(
pRsp
!=
NULL
)
{
pRsp
->
ahandle
=
pMsg
->
ahandle
;
rpcSendResponse
(
pRsp
);
rpcSendResponse
(
pRsp
);
free
(
pRsp
);
free
(
pRsp
);
}
else
{
}
else
{
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
bed4c349
...
@@ -285,6 +285,21 @@ typedef struct {
...
@@ -285,6 +285,21 @@ typedef struct {
char
payload
[];
char
payload
[];
}
SShowObj
;
}
SShowObj
;
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
uint64_t
uid
;
uint64_t
dbUid
;
int32_t
version
;
SRWLatch
lock
;
int32_t
execLen
;
void
*
executor
;
int32_t
sqlLen
;
char
*
sql
;
}
STopicObj
;
typedef
struct
SMnodeMsg
{
typedef
struct
SMnodeMsg
{
char
user
[
TSDB_USER_LEN
];
char
user
[
TSDB_USER_LEN
];
char
db
[
TSDB_FULL_DB_NAME_LEN
];
char
db
[
TSDB_FULL_DB_NAME_LEN
];
...
...
source/dnode/mnode/impl/inc/mndStb.h
浏览文件 @
bed4c349
...
@@ -25,6 +25,9 @@ extern "C" {
...
@@ -25,6 +25,9 @@ extern "C" {
int32_t
mndInitStb
(
SMnode
*
pMnode
);
int32_t
mndInitStb
(
SMnode
*
pMnode
);
void
mndCleanupStb
(
SMnode
*
pMnode
);
void
mndCleanupStb
(
SMnode
*
pMnode
);
SStbObj
*
mndAcquireStb
(
SMnode
*
pMnode
,
char
*
stbName
);
void
mndReleaseStb
(
SMnode
*
pMnode
,
SStbObj
*
pStb
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/mnode/impl/inc/mndTopic.h
0 → 100644
浏览文件 @
bed4c349
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_TOPIC_H_
#define _TD_MND_TOPIC_H_
#include "mndInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
mndInitTopic
(
SMnode
*
pMnode
);
void
mndCleanupTopic
(
SMnode
*
pMnode
);
STopicObj
*
mndAcquireTopic
(
SMnode
*
pMnode
,
char
*
topicName
);
void
mndReleaseTopic
(
SMnode
*
pMnode
,
STopicObj
*
pTopic
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_MND_TOPIC_H_*/
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
bed4c349
...
@@ -541,6 +541,15 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
...
@@ -541,6 +541,15 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
}
}
static
int32_t
mndSetUpdateDbRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
static
int32_t
mndSetUpdateDbRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
SSdbRaw
*
pRedoRaw
=
mndDbActionEncode
(
pOldDb
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_UPDATING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetUpdateDbCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
SSdbRaw
*
pRedoRaw
=
mndDbActionEncode
(
pNewDb
);
SSdbRaw
*
pRedoRaw
=
mndDbActionEncode
(
pNewDb
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
...
@@ -549,24 +558,57 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
...
@@ -549,24 +558,57 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
return
0
;
return
0
;
}
}
static
int32_t
mndSetUpdateDbUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
static
int32_t
mndBuildUpdateVgroupAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SSdbRaw
*
pUndoRaw
=
mndDbActionEncode
(
pOldDb
);
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
if
(
pUndoRaw
==
NULL
)
return
-
1
;
STransAction
action
=
{
0
};
if
(
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
return
-
1
;
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
if
(
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
return
-
1
;
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
SAlterVnodeMsg
*
pMsg
=
(
SAlterVnodeMsg
*
)
mndBuildCreateVnodeMsg
(
pMnode
,
pDnode
,
pDb
,
pVgroup
);
if
(
pMsg
==
NULL
)
return
-
1
;
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SAlterVnodeMsg
);
action
.
msgType
=
TSDB_MSG_TYPE_ALTER_VNODE_IN
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
}
return
0
;
return
0
;
}
}
static
int32_t
mndSetUpdateDbCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
return
0
;
}
static
int32_t
mndSetUpdateDbRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
static
int32_t
mndSetUpdateDbRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
return
0
;
}
while
(
1
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
static
int32_t
mndSetUpdateDbUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
return
0
;
}
if
(
pVgroup
->
dbUid
==
pNewDb
->
uid
)
{
if
(
mndBuildUpdateVgroupAction
(
pMnode
,
pTrans
,
pNewDb
,
pVgroup
)
!=
0
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndUpdateDb
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
static
int32_t
mndUpdateDb
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDbObj
*
pOldDb
,
SDbObj
*
pNewDb
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_R
OLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_R
ETRY
,
pMsg
->
rpcMsg
.
handle
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
mError
(
"db:%s, failed to update since %s"
,
pOldDb
->
name
,
terrstr
());
mError
(
"db:%s, failed to update since %s"
,
pOldDb
->
name
,
terrstr
());
return
terrno
;
return
terrno
;
...
@@ -579,11 +621,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
...
@@ -579,11 +621,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
goto
UPDATE_DB_OVER
;
goto
UPDATE_DB_OVER
;
}
}
if
(
mndSetUpdateDbUndoLogs
(
pMnode
,
pTrans
,
pOldDb
,
pNewDb
)
!=
0
)
{
mError
(
"trans:%d, failed to set undo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
UPDATE_DB_OVER
;
}
if
(
mndSetUpdateDbCommitLogs
(
pMnode
,
pTrans
,
pOldDb
,
pNewDb
)
!=
0
)
{
if
(
mndSetUpdateDbCommitLogs
(
pMnode
,
pTrans
,
pOldDb
,
pNewDb
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
UPDATE_DB_OVER
;
goto
UPDATE_DB_OVER
;
...
@@ -594,11 +631,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
...
@@ -594,11 +631,6 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO
goto
UPDATE_DB_OVER
;
goto
UPDATE_DB_OVER
;
}
}
if
(
mndSetUpdateDbUndoActions
(
pMnode
,
pTrans
,
pOldDb
,
pNewDb
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
UPDATE_DB_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
UPDATE_DB_OVER
;
goto
UPDATE_DB_OVER
;
...
@@ -660,31 +692,87 @@ static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb)
...
@@ -660,31 +692,87 @@ static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb)
return
0
;
return
0
;
}
}
static
int32_t
mndSetDropDbUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
SSdbRaw
*
pUndoRaw
=
mndDbActionEncode
(
pDb
);
if
(
pUndoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropDbCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
static
int32_t
mndSetDropDbCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
SSdbRaw
*
pCommitRaw
=
mndDbActionEncode
(
pDb
);
SSdbRaw
*
pCommitRaw
=
mndDbActionEncode
(
pDb
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
==
pDb
->
uid
)
{
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroup
);
if
(
pVgRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pVgRaw
)
!=
0
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
sdbSetRawStatus
(
pVgRaw
,
SDB_STATUS_DROPPED
);
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndBuildDropVgroupAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
STransAction
action
=
{
0
};
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
return
-
1
;
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
SDropVnodeMsg
*
pMsg
=
mndBuildDropVnodeMsg
(
pMnode
,
pDnode
,
pDb
,
pVgroup
);
if
(
pMsg
==
NULL
)
return
-
1
;
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SCreateVnodeMsg
);
action
.
msgType
=
TSDB_MSG_TYPE_DROP_VNODE_IN
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
}
return
0
;
return
0
;
}
}
static
int32_t
mndSetDropDbRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
return
0
;
}
static
int32_t
mndSetDropDbRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
static
int32_t
mndSetDropDbUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
return
0
;
}
while
(
1
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
==
pDb
->
uid
)
{
if
(
mndBuildDropVgroupAction
(
pMnode
,
pTrans
,
pDb
,
pVgroup
)
!=
0
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndDropDb
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDbObj
*
pDb
)
{
static
int32_t
mndDropDb
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDbObj
*
pDb
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_R
OLLBACK
,
pMsg
->
rpcMsg
.
handle
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_R
ETRY
,
pMsg
->
rpcMsg
.
handle
);
if
(
pTrans
==
NULL
)
{
if
(
pTrans
==
NULL
)
{
mError
(
"db:%s, failed to drop since %s"
,
pDb
->
name
,
terrstr
());
mError
(
"db:%s, failed to drop since %s"
,
pDb
->
name
,
terrstr
());
return
-
1
;
return
-
1
;
...
@@ -697,11 +785,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
...
@@ -697,11 +785,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
goto
DROP_DB_OVER
;
goto
DROP_DB_OVER
;
}
}
if
(
mndSetDropDbUndoLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
{
mError
(
"trans:%d, failed to set undo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_DB_OVER
;
}
if
(
mndSetDropDbCommitLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
{
if
(
mndSetDropDbCommitLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_DB_OVER
;
goto
DROP_DB_OVER
;
...
@@ -712,11 +795,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
...
@@ -712,11 +795,6 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
goto
DROP_DB_OVER
;
goto
DROP_DB_OVER
;
}
}
if
(
mndSetDropDbUndoActions
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_DB_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_DB_OVER
;
goto
DROP_DB_OVER
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
bed4c349
...
@@ -484,6 +484,15 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
...
@@ -484,6 +484,15 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
}
}
}
}
//topic should have different name with stb
SStbObj
*
pTopic
=
mndAcquireStb
(
pMnode
,
pCreate
->
name
);
if
(
pTopic
!=
NULL
)
{
sdbRelease
(
pMnode
->
pSdb
,
pTopic
);
terrno
=
TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC
;
mError
(
"stb:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
SDbObj
*
pDb
=
mndAcquireDbByStb
(
pMnode
,
pCreate
->
name
);
SDbObj
*
pDb
=
mndAcquireDbByStb
(
pMnode
,
pCreate
->
name
);
if
(
pDb
==
NULL
)
{
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
...
@@ -891,4 +900,4 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
...
@@ -891,4 +900,4 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
static
void
mndCancelGetNextStb
(
SMnode
*
pMnode
,
void
*
pIter
)
{
static
void
mndCancelGetNextStb
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
sdbCancelFetch
(
pSdb
,
pIter
);
}
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndTopic.c
0 → 100644
浏览文件 @
bed4c349
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndStb.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tname.h"
#define TSDB_TOPIC_VER_NUMBER 1
#define TSDB_TOPIC_RESERVE_SIZE 64
static
SSdbRaw
*
mndTopicActionEncode
(
STopicObj
*
pTopic
);
static
SSdbRow
*
mndTopicActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndTopicActionInsert
(
SSdb
*
pSdb
,
STopicObj
*
pTopic
);
static
int32_t
mndTopicActionDelete
(
SSdb
*
pSdb
,
STopicObj
*
pTopic
);
static
int32_t
mndTopicActionUpdate
(
SSdb
*
pSdb
,
STopicObj
*
pTopic
,
STopicObj
*
pNewTopic
);
static
int32_t
mndProcessCreateTopicMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterTopicMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropTopicMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateTopicInRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterTopicInRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropTopicInRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessTopicMetaMsg
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetTopicMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
static
int32_t
mndRetrieveTopic
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextTopic
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitTopic
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_TOPIC
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndTopicActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndTopicActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndTopicActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndTopicActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndTopicActionDelete
};
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_TOPIC
,
mndProcessCreateTopicMsg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_ALTER_TOPIC
,
mndProcessAlterTopicMsg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_TOPIC
,
mndProcessDropTopicMsg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_TOPIC_IN_RSP
,
mndProcessCreateTopicInRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_ALTER_TOPIC_IN_RSP
,
mndProcessAlterTopicInRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_DROP_TOPIC_IN_RSP
,
mndProcessDropTopicInRsp
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_TABLE_META
,
mndProcessTopicMetaMsg
);
/*mndAddShowMetaHandle(pMnode, TSDB_MGMT_TOPIC, mndGetTopicMeta);*/
/*mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TOPIC, mndRetrieveTopic);*/
/*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TOPIC, mndCancelGetNextTopic);*/
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_TOPIC
,
mndProcessCreateTopicMsg
);
mndSetMsgHandle
(
pMnode
,
TSDB_MSG_TYPE_CREATE_TOPIC_IN_RSP
,
mndProcessCreateTopicInRsp
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupTopic
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndTopicActionEncode
(
STopicObj
*
pTopic
)
{
int32_t
size
=
sizeof
(
STopicObj
)
+
TSDB_TOPIC_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TOPIC
,
TSDB_TOPIC_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
name
,
TSDB_TABLE_FNAME_LEN
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
db
,
TSDB_FULL_DB_NAME_LEN
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
createTime
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
updateTime
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
uid
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
dbUid
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
version
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
execLen
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
executor
,
pTopic
->
execLen
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_TOPIC_RESERVE_SIZE
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
return
pRaw
;
}
static
SSdbRow
*
mndTopicActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB_TOPIC_VER_NUMBER
)
{
mError
(
"failed to decode stable since %s"
,
terrstr
());
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
return
NULL
;
}
int32_t
size
=
sizeof
(
STopicObj
)
+
TSDB_MAX_COLUMNS
*
sizeof
(
SSchema
);
SSdbRow
*
pRow
=
sdbAllocRow
(
size
);
STopicObj
*
pTopic
=
sdbGetRowObj
(
pRow
);
if
(
pTopic
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pTopic
->
name
,
TSDB_TABLE_FNAME_LEN
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pTopic
->
db
,
TSDB_FULL_DB_NAME_LEN
);
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pTopic
->
createTime
);
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pTopic
->
updateTime
);
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pTopic
->
uid
);
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pTopic
->
dbUid
);
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pTopic
->
version
);
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pTopic
->
execLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pTopic
->
executor
,
pTopic
->
execLen
);
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pTopic
->
sqlLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
);
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_TOPIC_RESERVE_SIZE
);
return
pRow
;
}
static
int32_t
mndTopicActionInsert
(
SSdb
*
pSdb
,
STopicObj
*
pTopic
)
{
mTrace
(
"topic:%s, perform insert action"
,
pTopic
->
name
);
return
0
;
}
static
int32_t
mndTopicActionDelete
(
SSdb
*
pSdb
,
STopicObj
*
pTopic
)
{
mTrace
(
"topic:%s, perform delete action"
,
pTopic
->
name
);
return
0
;
}
static
int32_t
mndTopicActionUpdate
(
SSdb
*
pSdb
,
STopicObj
*
pOldTopic
,
STopicObj
*
pNewTopic
)
{
mTrace
(
"topic:%s, perform update action"
,
pOldTopic
->
name
);
atomic_exchange_32
(
&
pOldTopic
->
updateTime
,
pNewTopic
->
updateTime
);
atomic_exchange_32
(
&
pOldTopic
->
version
,
pNewTopic
->
version
);
taosWLockLatch
(
&
pOldTopic
->
lock
);
#if 0
pOldTopic->numOfColumns = pNewTopic->numOfColumns;
pOldTopic->numOfTags = pNewTopic->numOfTags;
int32_t totalCols = pNewTopic->numOfTags + pNewTopic->numOfColumns;
int32_t totalSize = totalCols * sizeof(SSchema);
if (pOldTopic->numOfTags + pOldTopic->numOfColumns < totalCols) {
void *pSchema = malloc(totalSize);
if (pSchema != NULL) {
free(pOldTopic->pSchema);
pOldTopic->pSchema = pSchema;
}
}
memcpy(pOldTopic->pSchema, pNewTopic->pSchema, totalSize);
#endif
taosWUnLockLatch
(
&
pOldTopic
->
lock
);
return
0
;
}
STopicObj
*
mndAcquireTopic
(
SMnode
*
pMnode
,
char
*
topicName
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
STopicObj
*
pTopic
=
sdbAcquire
(
pSdb
,
SDB_TOPIC
,
topicName
);
if
(
pTopic
==
NULL
)
{
terrno
=
TSDB_CODE_MND_TOPIC_NOT_EXIST
;
}
return
pTopic
;
}
void
mndReleaseTopic
(
SMnode
*
pMnode
,
STopicObj
*
pTopic
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pTopic
);
}
static
SDbObj
*
mndAcquireDbByTopic
(
SMnode
*
pMnode
,
char
*
topicName
)
{
SName
name
=
{
0
};
tNameFromString
(
&
name
,
topicName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TOPIC
);
char
db
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
name
,
db
);
return
mndAcquireDb
(
pMnode
,
db
);
}
static
SCreateTopicInternalMsg
*
mndBuildCreateTopicMsg
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
STopicObj
*
pTopic
)
{
int32_t
totalCols
=
0
;
int32_t
contLen
=
sizeof
(
SCreateTopicInternalMsg
)
+
pTopic
->
execLen
+
pTopic
->
sqlLen
;
SCreateTopicInternalMsg
*
pCreate
=
calloc
(
1
,
contLen
);
if
(
pCreate
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pCreate
->
head
.
contLen
=
htonl
(
contLen
);
pCreate
->
head
.
vgId
=
htonl
(
pVgroup
->
vgId
);
memcpy
(
pCreate
->
name
,
pTopic
->
name
,
TSDB_TABLE_FNAME_LEN
);
pCreate
->
tuid
=
htobe64
(
pTopic
->
uid
);
pCreate
->
sverson
=
htonl
(
pTopic
->
version
);
pCreate
->
sql
=
malloc
(
pTopic
->
sqlLen
);
if
(
pCreate
->
sql
==
NULL
)
{
free
(
pCreate
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
memcpy
(
pCreate
->
sql
,
pTopic
->
sql
,
pTopic
->
sqlLen
);
pCreate
->
executor
=
malloc
(
pTopic
->
execLen
);
if
(
pCreate
->
executor
==
NULL
)
{
free
(
pCreate
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
memcpy
(
pCreate
->
executor
,
pTopic
->
executor
,
pTopic
->
execLen
);
return
pCreate
;
}
static
SDropTopicInternalMsg
*
mndBuildDropTopicMsg
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
STopicObj
*
pTopic
)
{
int32_t
contLen
=
sizeof
(
SDropTopicInternalMsg
);
SDropTopicInternalMsg
*
pDrop
=
calloc
(
1
,
contLen
);
if
(
pDrop
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pDrop
->
head
.
contLen
=
htonl
(
contLen
);
pDrop
->
head
.
vgId
=
htonl
(
pVgroup
->
vgId
);
memcpy
(
pDrop
->
name
,
pTopic
->
name
,
TSDB_TABLE_FNAME_LEN
);
pDrop
->
tuid
=
htobe64
(
pTopic
->
uid
);
return
pDrop
;
}
static
int32_t
mndCheckCreateTopicMsg
(
SCreateTopicMsg
*
pCreate
)
{
//deserialize and other stuff
return
0
;
}
static
int32_t
mndSetCreateTopicRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
STopicObj
*
pTopic
)
{
SSdbRaw
*
pRedoRaw
=
mndTopicActionEncode
(
pTopic
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_CREATING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateTopicUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
STopicObj
*
pTopic
)
{
SSdbRaw
*
pUndoRaw
=
mndTopicActionEncode
(
pTopic
);
if
(
pUndoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateTopicCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
STopicObj
*
pTopic
)
{
SSdbRaw
*
pCommitRaw
=
mndTopicActionEncode
(
pTopic
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateTopicRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
STopicObj
*
pTopic
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
!=
pDb
->
uid
)
continue
;
SCreateTopicInternalMsg
*
pMsg
=
mndBuildCreateTopicMsg
(
pMnode
,
pVgroup
,
pTopic
);
if
(
pMsg
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
action
.
pCont
=
pMsg
;
action
.
contLen
=
htonl
(
pMsg
->
head
.
contLen
);
action
.
msgType
=
TSDB_MSG_TYPE_CREATE_TOPIC_IN
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndSetCreateTopicUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
STopicObj
*
pTopic
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
!=
pDb
->
uid
)
continue
;
SDropTopicInternalMsg
*
pMsg
=
mndBuildDropTopicMsg
(
pMnode
,
pVgroup
,
pTopic
);
if
(
pMsg
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SDropTopicInternalMsg
);
action
.
msgType
=
TSDB_MSG_TYPE_DROP_TOPIC_IN
;
if
(
mndTransAppendUndoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndCreateTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SCreateTopicMsg
*
pCreate
,
SDbObj
*
pDb
)
{
STopicObj
topicObj
=
{
0
};
tstrncpy
(
topicObj
.
name
,
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
tstrncpy
(
topicObj
.
db
,
pDb
->
name
,
TSDB_FULL_DB_NAME_LEN
);
topicObj
.
createTime
=
taosGetTimestampMs
();
topicObj
.
updateTime
=
topicObj
.
createTime
;
topicObj
.
uid
=
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
topicObj
.
dbUid
=
pDb
->
uid
;
topicObj
.
version
=
1
;
#if 0
int32_t totalCols = topicObj.numOfColumns + topicObj.numOfTags;
int32_t totalSize = totalCols * sizeof(SSchema);
topicObj.sql = malloc(totalSize);
if (topicObj.sql == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(topicObj.sql, pCreate->sql, totalSize);
#endif
int32_t
code
=
0
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to create topic:%s"
,
pTrans
->
id
,
pCreate
->
name
);
if
(
mndSetCreateTopicRedoLogs
(
pMnode
,
pTrans
,
pDb
,
&
topicObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_TOPIC_OVER
;
}
if
(
mndSetCreateTopicUndoLogs
(
pMnode
,
pTrans
,
pDb
,
&
topicObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set undo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_TOPIC_OVER
;
}
if
(
mndSetCreateTopicCommitLogs
(
pMnode
,
pTrans
,
pDb
,
&
topicObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_TOPIC_OVER
;
}
if
(
mndSetCreateTopicRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
topicObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_TOPIC_OVER
;
}
if
(
mndSetCreateTopicUndoActions
(
pMnode
,
pTrans
,
pDb
,
&
topicObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_TOPIC_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
}
code
=
0
;
CREATE_TOPIC_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndProcessCreateTopicMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SCreateTopicMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"topic:%s, start to create"
,
pCreate
->
name
);
if
(
mndCheckCreateTopicMsg
(
pCreate
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
STopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
pCreate
->
name
);
if
(
pTopic
!=
NULL
)
{
sdbRelease
(
pMnode
->
pSdb
,
pTopic
);
if
(
pCreate
->
igExists
)
{
mDebug
(
"topic:%s, already exist, ignore exist is set"
,
pCreate
->
name
);
return
0
;
}
else
{
terrno
=
TSDB_CODE_MND_TOPIC_ALREADY_EXIST
;
mError
(
"db:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
}
//topic should have different name with stb
SStbObj
*
pStb
=
mndAcquireStb
(
pMnode
,
pCreate
->
name
);
if
(
pStb
!=
NULL
)
{
sdbRelease
(
pMnode
->
pSdb
,
pStb
);
terrno
=
TSDB_CODE_MND_NAME_CONFLICT_WITH_STB
;
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
SDbObj
*
pDb
=
mndAcquireDbByTopic
(
pMnode
,
pCreate
->
name
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
int32_t
code
=
mndCreateTopic
(
pMnode
,
pMsg
,
pCreate
,
pDb
);
mndReleaseDb
(
pMnode
,
pDb
);
if
(
code
!=
0
)
{
terrno
=
code
;
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndCheckAlterTopicMsg
(
SAlterTopicMsg
*
pAlter
)
{
SSchema
*
pSchema
=
&
pAlter
->
schema
;
pSchema
->
colId
=
htonl
(
pSchema
->
colId
);
pSchema
->
bytes
=
htonl
(
pSchema
->
bytes
);
if
(
pSchema
->
type
<=
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
return
-
1
;
}
if
(
pSchema
->
colId
<
0
||
pSchema
->
colId
>=
(
TSDB_MAX_COLUMNS
+
TSDB_MAX_TAGS
))
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
return
-
1
;
}
if
(
pSchema
->
bytes
<=
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
return
-
1
;
}
if
(
pSchema
->
name
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
return
-
1
;
}
return
0
;
}
static
int32_t
mndUpdateTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
STopicObj
*
pOldTopic
,
STopicObj
*
pNewTopic
)
{
return
0
;
}
static
int32_t
mndProcessAlterTopicMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SAlterTopicMsg
*
pAlter
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"topic:%s, start to alter"
,
pAlter
->
name
);
if
(
mndCheckAlterTopicMsg
(
pAlter
)
!=
0
)
{
mError
(
"topic:%s, failed to alter since %s"
,
pAlter
->
name
,
terrstr
());
return
-
1
;
}
STopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
pAlter
->
name
);
if
(
pTopic
==
NULL
)
{
terrno
=
TSDB_CODE_MND_TOPIC_NOT_EXIST
;
mError
(
"topic:%s, failed to alter since %s"
,
pAlter
->
name
,
terrstr
());
return
-
1
;
}
STopicObj
topicObj
=
{
0
};
memcpy
(
&
topicObj
,
pTopic
,
sizeof
(
STopicObj
));
int32_t
code
=
mndUpdateTopic
(
pMnode
,
pMsg
,
pTopic
,
&
topicObj
);
mndReleaseTopic
(
pMnode
,
pTopic
);
if
(
code
!=
0
)
{
mError
(
"topic:%s, failed to alter since %s"
,
pAlter
->
name
,
tstrerror
(
code
));
return
code
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessAlterTopicInRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransHandleActionRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndSetDropTopicRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STopicObj
*
pTopic
)
{
SSdbRaw
*
pRedoRaw
=
mndTopicActionEncode
(
pTopic
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropTopicUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STopicObj
*
pTopic
)
{
SSdbRaw
*
pUndoRaw
=
mndTopicActionEncode
(
pTopic
);
if
(
pUndoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropTopicCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STopicObj
*
pTopic
)
{
SSdbRaw
*
pCommitRaw
=
mndTopicActionEncode
(
pTopic
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropTopicRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STopicObj
*
pTopic
)
{
return
0
;
}
static
int32_t
mndSetDropTopicUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STopicObj
*
pTopic
)
{
return
0
;
}
static
int32_t
mndDropTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
STopicObj
*
pTopic
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
pMsg
->
rpcMsg
.
handle
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to drop since %s"
,
pTopic
->
name
,
terrstr
());
return
-
1
;
}
mDebug
(
"trans:%d, used to drop topic:%s"
,
pTrans
->
id
,
pTopic
->
name
);
if
(
mndSetDropTopicRedoLogs
(
pMnode
,
pTrans
,
pTopic
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_TOPIC_OVER
;
}
if
(
mndSetDropTopicUndoLogs
(
pMnode
,
pTrans
,
pTopic
)
!=
0
)
{
mError
(
"trans:%d, failed to set undo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_TOPIC_OVER
;
}
if
(
mndSetDropTopicCommitLogs
(
pMnode
,
pTrans
,
pTopic
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_TOPIC_OVER
;
}
if
(
mndSetDropTopicRedoActions
(
pMnode
,
pTrans
,
pTopic
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_TOPIC_OVER
;
}
if
(
mndSetDropTopicUndoActions
(
pMnode
,
pTrans
,
pTopic
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_TOPIC_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_TOPIC_OVER
;
}
code
=
0
;
DROP_TOPIC_OVER:
mndTransDrop
(
pTrans
);
return
0
;
}
static
int32_t
mndProcessDropTopicMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SDropTopicMsg
*
pDrop
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"topic:%s, start to drop"
,
pDrop
->
name
);
STopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
pDrop
->
name
);
if
(
pTopic
==
NULL
)
{
if
(
pDrop
->
igNotExists
)
{
mDebug
(
"topic:%s, not exist, ignore not exist is set"
,
pDrop
->
name
);
return
0
;
}
else
{
terrno
=
TSDB_CODE_MND_TOPIC_NOT_EXIST
;
mError
(
"topic:%s, failed to drop since %s"
,
pDrop
->
name
,
terrstr
());
return
-
1
;
}
}
int32_t
code
=
mndDropTopic
(
pMnode
,
pMsg
,
pTopic
);
mndReleaseTopic
(
pMnode
,
pTopic
);
if
(
code
!=
0
)
{
terrno
=
code
;
mError
(
"topic:%s, failed to drop since %s"
,
pDrop
->
name
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessDropTopicInRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransHandleActionRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndProcessTopicMetaMsg
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
STableInfoMsg
*
pInfo
=
pMsg
->
rpcMsg
.
pCont
;
mDebug
(
"topic:%s, start to retrieve meta"
,
pInfo
->
tableFname
);
#if 0
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("topic:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1;
}
STopicObj *pTopic = mndAcquireTopic(pMnode, pInfo->tableFname);
if (pTopic == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_TOPIC;
mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
taosRLockLatch(&pTopic->lock);
int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pTopic->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1;
}
memcpy(pMeta->topicFname, pTopic->name, TSDB_TABLE_FNAME_LEN);
pMeta->numOfTags = htonl(pTopic->numOfTags);
pMeta->numOfColumns = htonl(pTopic->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pTopic->version);
pMeta->tuid = htonl(pTopic->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pTopic->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pTopic->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseTopic(pMnode, pTopic);
pMsg->pCont = pMeta;
pMsg->contLen = contLen;
mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags);
#endif
return
0
;
}
static
int32_t
mndProcessCreateTopicInRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransHandleActionRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndGetNumOfTopics
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfTopics
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
dbName
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
return
-
1
;
}
int32_t
numOfTopics
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
STopicObj
*
pTopic
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_TOPIC
,
pIter
,
(
void
**
)
&
pTopic
);
if
(
pIter
==
NULL
)
break
;
if
(
strcmp
(
pTopic
->
db
,
dbName
)
==
0
)
{
numOfTopics
++
;
}
sdbRelease
(
pSdb
,
pTopic
);
}
*
pNumOfTopics
=
numOfTopics
;
return
0
;
}
static
int32_t
mndGetTopicMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
if
(
mndGetNumOfTopics
(
pMnode
,
pShow
->
db
,
&
pShow
->
numOfRows
)
!=
0
)
{
return
-
1
;
}
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
pSchema
;
pShow
->
bytes
[
cols
]
=
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"name"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create_time"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"columns"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"tags"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htonl
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
sdbGetSize
(
pSdb
,
SDB_TOPIC
);
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
strcpy
(
pMeta
->
tbFname
,
mndShowStr
(
pShow
->
type
));
return
0
;
}
static
void
mndExtractTableName
(
char
*
tableId
,
char
*
name
)
{
int
pos
=
-
1
;
int
num
=
0
;
for
(
pos
=
0
;
tableId
[
pos
]
!=
0
;
++
pos
)
{
if
(
tableId
[
pos
]
==
'.'
)
num
++
;
if
(
num
==
2
)
break
;
}
if
(
num
==
2
)
{
strcpy
(
name
,
tableId
+
pos
+
1
);
}
}
static
int32_t
mndRetrieveTopic
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
STopicObj
*
pTopic
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
prefix
[
64
]
=
{
0
};
tstrncpy
(
prefix
,
pShow
->
db
,
64
);
strcat
(
prefix
,
TS_PATH_DELIMITER
);
int32_t
prefixLen
=
(
int32_t
)
strlen
(
prefix
);
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_TOPIC
,
pShow
->
pIter
,
(
void
**
)
&
pTopic
);
if
(
pShow
->
pIter
==
NULL
)
break
;
if
(
strncmp
(
pTopic
->
name
,
prefix
,
prefixLen
)
!=
0
)
{
sdbRelease
(
pSdb
,
pTopic
);
continue
;
}
cols
=
0
;
char
topicName
[
TSDB_TABLE_NAME_LEN
]
=
{
0
};
tstrncpy
(
topicName
,
pTopic
->
name
+
prefixLen
,
TSDB_TABLE_NAME_LEN
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_TO_VARSTR
(
pWrite
,
topicName
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pTopic
->
createTime
;
cols
++
;
/*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/
/**(int32_t *)pWrite = pTopic->numOfColumns;*/
/*cols++;*/
/*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/
/**(int32_t *)pWrite = pTopic->numOfTags;*/
/*cols++;*/
numOfRows
++
;
sdbRelease
(
pSdb
,
pTopic
);
}
pShow
->
numOfReads
+=
numOfRows
;
mndVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
return
numOfRows
;
}
static
void
mndCancelGetNextTopic
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
bed4c349
...
@@ -696,7 +696,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
...
@@ -696,7 +696,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
for
(
int32_t
action
=
0
;
action
<
numOfActions
;
++
action
)
{
for
(
int32_t
action
=
0
;
action
<
numOfActions
;
++
action
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
action
);
if
(
pAction
==
NULL
)
continue
;
if
(
pAction
==
NULL
)
continue
;
if
(
pAction
->
msgReceived
&&
pAction
->
errCode
==
0
)
continue
;
if
(
pAction
->
msgSent
&&
!
pAction
->
msgReceived
)
continue
;
if
(
pAction
->
msgSent
&&
pAction
->
msgReceived
&&
pAction
->
errCode
==
0
)
continue
;
int64_t
signature
=
pTrans
->
id
;
int64_t
signature
=
pTrans
->
id
;
signature
=
(
signature
<<
32
);
signature
=
(
signature
<<
32
);
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
bed4c349
...
@@ -337,8 +337,16 @@ static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
...
@@ -337,8 +337,16 @@ static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
return
0
;
return
0
;
}
}
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
static
int32_t
mndProcessDropVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
mndTransHandleActionRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndProcessDropVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransHandleActionRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndProcessSyncVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessSyncVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessCompactVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
static
int32_t
mndProcessCompactVnodeRsp
(
SMnodeMsg
*
pMsg
)
{
return
0
;
}
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
bed4c349
...
@@ -27,6 +27,7 @@
...
@@ -27,6 +27,7 @@
#include "mndStb.h"
#include "mndStb.h"
#include "mndSync.h"
#include "mndSync.h"
#include "mndTelem.h"
#include "mndTelem.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndVgroup.h"
...
@@ -132,6 +133,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
...
@@ -132,6 +133,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if
(
mndAllocStep
(
pMnode
,
"mnode-db"
,
mndInitDb
,
mndCleanupDb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-db"
,
mndInitDb
,
mndCleanupDb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-vgroup"
,
mndInitVgroup
,
mndCleanupVgroup
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-vgroup"
,
mndInitVgroup
,
mndCleanupVgroup
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-stb"
,
mndInitStb
,
mndCleanupStb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-stb"
,
mndInitStb
,
mndCleanupStb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-topic"
,
mndInitTopic
,
mndCleanupTopic
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-func"
,
mndInitFunc
,
mndCleanupFunc
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-func"
,
mndInitFunc
,
mndCleanupFunc
)
!=
0
)
return
-
1
;
if
(
pMnode
->
clusterId
<=
0
)
{
if
(
pMnode
->
clusterId
<=
0
)
{
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb-deploy"
,
mndDeploySdb
,
NULL
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb-deploy"
,
mndDeploySdb
,
NULL
)
!=
0
)
return
-
1
;
...
@@ -227,7 +229,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
...
@@ -227,7 +229,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
}
}
return
0
;
return
0
;
}
}
SMnode
*
mndOpen
(
const
char
*
path
,
const
SMnodeOpt
*
pOption
)
{
SMnode
*
mndOpen
(
const
char
*
path
,
const
SMnodeOpt
*
pOption
)
{
mDebug
(
"start to open mnode in %s"
,
path
);
mDebug
(
"start to open mnode in %s"
,
path
);
...
@@ -442,4 +444,4 @@ uint64_t mndGenerateUid(char *name, int32_t len) {
...
@@ -442,4 +444,4 @@ uint64_t mndGenerateUid(char *name, int32_t len) {
int32_t
hashval
=
MurmurHash3_32
(
name
,
len
);
int32_t
hashval
=
MurmurHash3_32
(
name
,
len
);
uint64_t
x
=
(
us
&
0x000000FFFFFFFFFF
)
<<
24
;
uint64_t
x
=
(
us
&
0x000000FFFFFFFFFF
)
<<
24
;
return
x
+
((
hashval
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
return
x
+
((
hashval
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
}
}
\ No newline at end of file
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
bed4c349
...
@@ -199,6 +199,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
...
@@ -199,6 +199,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
SSdbRow
*
pRow
=
*
ppRow
;
SSdbRow
*
pRow
=
*
ppRow
;
switch
(
pRow
->
status
)
{
switch
(
pRow
->
status
)
{
case
SDB_STATUS_READY
:
case
SDB_STATUS_READY
:
case
SDB_STATUS_UPDATING
:
atomic_add_fetch_32
(
&
pRow
->
refCount
,
1
);
atomic_add_fetch_32
(
&
pRow
->
refCount
,
1
);
pRet
=
pRow
->
pObj
;
pRet
=
pRow
->
pObj
;
break
;
break
;
...
...
tests/script/general/db/basic1.sim
浏览文件 @
bed4c349
system sh/stop_dnodes.sh
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
sql connect
print =============== create database
print =============== create database
...
...
tests/script/general/table/basic1.sim
浏览文件 @
bed4c349
system sh/stop_dnodes.sh
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
sql connect
print =============== create database
print =============== create database
...
@@ -13,29 +12,24 @@ endi
...
@@ -13,29 +12,24 @@ endi
print $data00 $data01 $data02
print $data00 $data01 $data02
print =============== create normal table
sql use d1
sql create table d1.n1 (ts timestamp, i int)
sql show d1.tables
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
print =============== create super table
print =============== create super table
sql create table
d1.
st (ts timestamp, i int) tags (j int)
sql create table st (ts timestamp, i int) tags (j int)
sql show
d1.
stables
sql show stables
if $rows != 1 then
if $rows != 1 then
return -1
return -1
endi
endi
print $data00 $data01 $data02
print $data00 $data01 $data02
return
print =============== create child table
print =============== create child table
sql create table
d1.c1 using d1.
st tags(1)
sql create table
c1 using
st tags(1)
sql create table
d1.c2 using d1.
st tags(2)
sql create table
c2 using
st tags(2)
sql show
d1.
tables
sql show tables
if $rows !=
3
then
if $rows !=
2
then
return -1
return -1
endi
endi
...
@@ -44,12 +38,12 @@ print $data10 $data11 $data22
...
@@ -44,12 +38,12 @@ print $data10 $data11 $data22
print $data20 $data11 $data22
print $data20 $data11 $data22
print =============== insert data
print =============== insert data
sql insert into
d1.n
1 values(now+1s, 1)
sql insert into
c
1 values(now+1s, 1)
sql insert into
d1.n
1 values(now+2s, 2)
sql insert into
c
1 values(now+2s, 2)
sql insert into
d1.n
1 values(now+3s, 3)
sql insert into
c
1 values(now+3s, 3)
print =============== query data
print =============== query data
sql select * from
d1.n
1
sql select * from
c
1
if $rows != 3 then
if $rows != 3 then
return -1
return -1
endi
endi
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录