Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
69b2ec09
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
69b2ec09
编写于
3月 23, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sma
上级
bd6cde63
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
751 addition
and
10 deletion
+751
-10
include/common/tmsg.h
include/common/tmsg.h
+5
-3
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+5
-4
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+3
-1
source/dnode/mnode/impl/inc/mndSma.h
source/dnode/mnode/impl/inc/mndSma.h
+1
-1
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+737
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+0
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
69b2ec09
...
...
@@ -108,6 +108,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_STREAMTABLES
,
TSDB_MGMT_TABLE_TP
,
TSDB_MGMT_TABLE_FUNC
,
TSDB_MGMT_TABLE_INDEX
,
TSDB_MGMT_TABLE_MAX
,
}
EShowType
;
...
...
@@ -1930,8 +1931,9 @@ typedef struct {
int8_t
version
;
// for compatibility(default 0)
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
slidingUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
timezoneInt
;
// sma data expired if timezone changes.
char
indexName
[
TSDB_INDEX_NAME_LEN
];
char
timezone
[
TD_TIMEZONE_LEN
];
// sma data expired if timezone changes.
char
timezone
[
TD_TIMEZONE_LEN
];
int32_t
exprLen
;
int32_t
tagsFilterLen
;
int64_t
indexUid
;
...
...
@@ -2038,8 +2040,8 @@ static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
tlen
+=
taosEncodeFixedI8
(
buf
,
pSma
->
version
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSma
->
intervalUnit
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSma
->
slidingUnit
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSma
->
timezoneInt
);
tlen
+=
taosEncodeString
(
buf
,
pSma
->
indexName
);
tlen
+=
taosEncodeString
(
buf
,
pSma
->
timezone
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSma
->
exprLen
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSma
->
tagsFilterLen
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pSma
->
indexUid
);
...
...
@@ -2073,8 +2075,8 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf
=
taosDecodeFixedI8
(
buf
,
&
pSma
->
version
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSma
->
intervalUnit
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSma
->
slidingUnit
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSma
->
timezoneInt
);
buf
=
taosDecodeStringTo
(
buf
,
pSma
->
indexName
);
buf
=
taosDecodeStringTo
(
buf
,
pSma
->
timezone
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSma
->
exprLen
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSma
->
tagsFilterLen
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pSma
->
indexUid
);
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
69b2ec09
...
...
@@ -119,10 +119,11 @@ typedef enum {
SDB_CONSUMER
=
13
,
SDB_TOPIC
=
14
,
SDB_VGROUP
=
15
,
SDB_STB
=
16
,
SDB_DB
=
17
,
SDB_FUNC
=
18
,
SDB_MAX
=
19
SDB_SMA
=
16
,
SDB_STB
=
17
,
SDB_DB
=
18
,
SDB_FUNC
=
19
,
SDB_MAX
=
20
}
ESdbType
;
typedef
struct
SSdb
SSdb
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
69b2ec09
...
...
@@ -309,10 +309,12 @@ typedef struct {
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
stb
[
TSDB_DB_FNAME_LEN
];
char
stb
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createdTime
;
int64_t
uid
;
int64_t
stbUid
;
int64_t
dbUid
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int8_t
timezone
;
...
...
source/dnode/mnode/impl/inc/mndSma.h
浏览文件 @
69b2ec09
...
...
@@ -25,7 +25,7 @@ extern "C" {
int32_t
mndInitSma
(
SMnode
*
pMnode
);
void
mndCleanupSma
(
SMnode
*
pMnode
);
SSmaObj
*
mndAcquireSma
(
SMnode
*
pMnode
,
char
*
smaName
);
void
mndReleaseSma
(
SMnode
*
pMnode
,
SS
tbObj
*
pStb
);
void
mndReleaseSma
(
SMnode
*
pMnode
,
SS
maObj
*
pSma
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndSma.c
0 → 100644
浏览文件 @
69b2ec09
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndSma.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.c"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tname.h"
#define TSDB_SMA_VER_NUMBER 1
#define TSDB_SMA_RESERVE_SIZE 64
static
SSdbRaw
*
mndSmaActionEncode
(
SSmaObj
*
pSma
);
static
SSdbRow
*
mndSmaActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndSmaActionInsert
(
SSdb
*
pSdb
,
SSmaObj
*
pSma
);
static
int32_t
mndSmaActionDelete
(
SSdb
*
pSdb
,
SSmaObj
*
pSpSmatb
);
static
int32_t
mndSmaActionUpdate
(
SSdb
*
pSdb
,
SSmaObj
*
pOld
,
SSmaObj
*
pNew
);
static
int32_t
mndProcessMCreateSmaReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessMDropSmaReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessVCreateSmaRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndProcessVDropSmaRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndGetSmaMeta
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveSma
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitSma
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SMA
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSmaActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSmaActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSmaActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSmaActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSmaActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_SMA
,
mndProcessMCreateSmaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_SMA
,
mndProcessMDropSmaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_SMA_RSP
,
mndProcessVCreateSmaRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_SMA_RSP
,
mndProcessVDropSmaRsp
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndGetSmaMeta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndRetrieveSma
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndCancelGetNextSma
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupSma
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndSmaActionEncode
(
SSmaObj
*
pSma
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
size
=
sizeof
(
SSmaObj
)
+
pSma
->
exprLen
+
pSma
->
tagsFilterLen
+
TSDB_SMA_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_SMA
,
TSDB_SMA_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
_OVER
;
int32_t
dataPos
=
0
;
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
name
,
TSDB_TABLE_FNAME_LEN
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
stb
,
TSDB_TABLE_FNAME_LEN
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
db
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
createdTime
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
uid
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
stbUid
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
dbUid
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSma
->
intervalUnit
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSma
->
slidingUnit
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSma
->
timezone
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
interval
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
offset
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
sliding
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pSma
->
exprLen
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pSma
->
tagsFilterLen
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
expr
,
pSma
->
exprLen
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
tagsFilter
,
pSma
->
tagsFilterLen
,
_OVER
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_SMA_RESERVE_SIZE
,
_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
_OVER
)
terrno
=
0
;
_OVER:
if
(
terrno
!=
0
)
{
mError
(
"sma:%s, failed to encode to raw:%p since %s"
,
pSma
->
name
,
pRaw
,
terrstr
());
sdbFreeRaw
(
pRaw
);
return
NULL
;
}
mTrace
(
"sma:%s, encode to raw:%p, row:%p"
,
pSma
->
name
,
pRaw
,
pSma
);
return
pRaw
;
}
static
SSdbRow
*
mndSmaActionDecode
(
SSdbRaw
*
pRaw
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
_OVER
;
if
(
sver
!=
TSDB_SMA_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
goto
_OVER
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SSmaObj
));
if
(
pRow
==
NULL
)
goto
_OVER
;
SSmaObj
*
pSma
=
sdbGetRowObj
(
pRow
);
if
(
pSma
==
NULL
)
goto
_OVER
;
int32_t
dataPos
=
0
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
name
,
TSDB_TABLE_FNAME_LEN
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
stb
,
TSDB_TABLE_FNAME_LEN
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
db
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
createdTime
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
uid
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
stbUid
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
dbUid
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pSma
->
intervalUnit
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pSma
->
slidingUnit
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pSma
->
timezone
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
interval
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
offset
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
sliding
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pSma
->
exprLen
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pSma
->
tagsFilterLen
,
_OVER
)
pSma
->
expr
=
calloc
(
pSma
->
exprLen
,
1
);
pSma
->
tagsFilter
=
calloc
(
pSma
->
tagsFilterLen
,
1
);
if
(
pSma
->
expr
==
NULL
||
pSma
->
tagsFilter
==
NULL
)
{
goto
_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
expr
,
pSma
->
exprLen
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
tagsFilter
,
pSma
->
tagsFilterLen
,
_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
TSDB_SMA_RESERVE_SIZE
,
_OVER
)
terrno
=
0
;
_OVER:
if
(
terrno
!=
0
)
{
mError
(
"sma:%s, failed to decode from raw:%p since %s"
,
pSma
->
name
,
pRaw
,
terrstr
());
tfree
(
pSma
->
expr
);
tfree
(
pSma
->
tagsFilter
);
tfree
(
pRow
);
return
NULL
;
}
mTrace
(
"sma:%s, decode from raw:%p, row:%p"
,
pSma
->
name
,
pRaw
,
pSma
);
return
pRow
;
}
static
int32_t
mndSmaActionInsert
(
SSdb
*
pSdb
,
SSmaObj
*
pSma
)
{
mTrace
(
"sma:%s, perform insert action, row:%p"
,
pSma
->
name
,
pSma
);
return
0
;
}
static
int32_t
mndSmaActionDelete
(
SSdb
*
pSdb
,
SSmaObj
*
pSma
)
{
mTrace
(
"sma:%s, perform delete action, row:%p"
,
pSma
->
name
,
pSma
);
tfree
(
pSma
->
tagsFilter
);
tfree
(
pSma
->
expr
);
return
0
;
}
static
int32_t
mndSmaActionUpdate
(
SSdb
*
pSdb
,
SSmaObj
*
pOld
,
SSmaObj
*
pNew
)
{
mTrace
(
"sma:%s, perform update action, old row:%p new row:%p"
,
pOld
->
name
,
pOld
,
pNew
);
return
0
;
}
SSmaObj
*
mndAcquireSma
(
SMnode
*
pMnode
,
char
*
smaName
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSmaObj
*
pSma
=
sdbAcquire
(
pSdb
,
SDB_SMA
,
smaName
);
if
(
pSma
==
NULL
&&
terrno
==
TSDB_CODE_SDB_OBJ_NOT_THERE
)
{
terrno
=
TSDB_CODE_MND_SMA_NOT_EXIST
;
}
return
pSma
;
}
void
mndReleaseSma
(
SMnode
*
pMnode
,
SSmaObj
*
pSma
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pSma
);
}
SDbObj
*
mndAcquireDbBySma
(
SMnode
*
pMnode
,
const
char
*
smaName
)
{
SName
name
=
{
0
};
tNameFromString
(
&
name
,
smaName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
char
db
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
name
,
db
);
return
mndAcquireDb
(
pMnode
,
db
);
}
static
void
*
mndBuildVCreateSmaReq
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SSmaObj
*
pSma
,
int32_t
*
pContLen
)
{
SName
name
=
{
0
};
tNameFromString
(
&
name
,
pSma
->
name
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
SVCreateTSmaReq
req
=
{
0
};
req
.
tSma
.
version
=
0
;
req
.
tSma
.
intervalUnit
=
pSma
->
intervalUnit
;
req
.
tSma
.
slidingUnit
=
pSma
->
slidingUnit
;
req
.
tSma
.
slidingUnit
=
pSma
->
timezone
;
tstrncpy
(
req
.
tSma
.
indexName
,
(
char
*
)
tNameGetTableName
(
&
name
),
TSDB_INDEX_NAME_LEN
);
req
.
tSma
.
exprLen
=
pSma
->
exprLen
;
req
.
tSma
.
tagsFilterLen
=
pSma
->
tagsFilterLen
;
req
.
tSma
.
indexUid
=
pSma
->
uid
;
req
.
tSma
.
tableUid
=
pSma
->
stbUid
;
req
.
tSma
.
interval
=
pSma
->
interval
;
req
.
tSma
.
offset
=
pSma
->
offset
;
req
.
tSma
.
sliding
=
pSma
->
sliding
;
req
.
tSma
.
expr
=
pSma
->
expr
;
req
.
tSma
.
tagsFilter
=
pSma
->
tagsFilter
;
int32_t
contLen
=
tSerializeSVCreateTSmaReq
(
NULL
,
&
req
)
+
sizeof
(
SMsgHead
);
SMsgHead
*
pHead
=
malloc
(
contLen
);
if
(
pHead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
void
*
pBuf
=
POINTER_SHIFT
(
pHead
,
sizeof
(
SMsgHead
));
tSerializeSVCreateTSmaReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
return
pHead
;
}
static
void
*
mndBuildVDropSmaReq
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SSmaObj
*
pSma
,
int32_t
*
pContLen
)
{
SName
name
=
{
0
};
tNameFromString
(
&
name
,
pSma
->
name
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
SVDropTSmaReq
req
=
{
0
};
req
.
ver
=
0
;
req
.
indexUid
=
pSma
->
uid
;
tstrncpy
(
req
.
indexName
,
(
char
*
)
tNameGetTableName
(
&
name
),
TSDB_INDEX_NAME_LEN
);
int32_t
contLen
=
tSerializeSVDropTSmaReq
(
NULL
,
&
req
)
+
sizeof
(
SMsgHead
);
SMsgHead
*
pHead
=
malloc
(
contLen
);
if
(
pHead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
void
*
pBuf
=
POINTER_SHIFT
(
pHead
,
sizeof
(
SMsgHead
));
tDeserializeSVDropTSmaReq
(
&
pBuf
,
&
req
);
*
pContLen
=
contLen
;
return
pHead
;
}
static
int32_t
mndSetCreateSmaRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SSmaObj
*
pSma
)
{
SSdbRaw
*
pRedoRaw
=
mndSmaActionEncode
(
pSma
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_CREATING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateSmaUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SSmaObj
*
pSma
)
{
SSdbRaw
*
pUndoRaw
=
mndSmaActionEncode
(
pSma
);
if
(
pUndoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendUndolog
(
pTrans
,
pUndoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pUndoRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateSmaCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SSmaObj
*
pSma
)
{
SSdbRaw
*
pCommitRaw
=
mndSmaActionEncode
(
pSma
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateSmaRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SSmaObj
*
pSma
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
int32_t
contLen
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
!=
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
void
*
pReq
=
mndBuildVCreateSmaReq
(
pMnode
,
pVgroup
,
pSma
,
&
contLen
);
if
(
pReq
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_VND_CREATE_SMA
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pReq
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndSetCreateSmaUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SSmaObj
*
pSma
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
!=
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildVDropSmaReq
(
pMnode
,
pVgroup
,
pSma
,
&
contLen
);
if
(
pReq
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_VND_DROP_SMA
;
if
(
mndTransAppendUndoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pReq
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndCreateTSma
(
SMnode
*
pMnode
,
SNodeMsg
*
pReq
,
SMCreateSmaReq
*
pCreate
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
SSmaObj
smaObj
=
{
0
};
memcpy
(
smaObj
.
name
,
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
memcpy
(
smaObj
.
stb
,
pStb
->
name
,
TSDB_TABLE_FNAME_LEN
);
smaObj
.
createdTime
=
taosGetTimestampMs
();
smaObj
.
uid
=
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
smaObj
.
stbUid
=
pStb
->
uid
;
smaObj
.
intervalUnit
=
pCreate
->
intervalUnit
;
smaObj
.
slidingUnit
=
pCreate
->
slidingUnit
;
smaObj
.
timezone
=
pCreate
->
timezone
;
smaObj
.
interval
=
pCreate
->
interval
;
smaObj
.
offset
=
pCreate
->
offset
;
smaObj
.
sliding
=
pCreate
->
sliding
;
smaObj
.
exprLen
=
pCreate
->
exprLen
;
smaObj
.
tagsFilterLen
=
pCreate
->
tagsFilterLen
;
smaObj
.
expr
=
pCreate
->
expr
;
pCreate
->
expr
=
NULL
;
smaObj
.
tagsFilter
=
pCreate
->
tagsFilter
;
pCreate
->
tagsFilter
=
NULL
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_CREATE_SMA
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create sma:%s"
,
pTrans
->
id
,
pCreate
->
name
);
mndTransSetDbInfo
(
pTrans
,
pDb
);
if
(
mndSetCreateSmaRedoLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaUndoLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaCommitLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaUndoActions
(
pMnode
,
pTrans
,
pDb
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndCheckCreateSmaReq
(
SMCreateSmaReq
*
pCreate
)
{
if
(
pCreate
->
igExists
<
0
||
pCreate
->
igExists
>
1
)
{
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
return
0
;
}
static
int32_t
mndProcessMCreateSmaReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
code
=
-
1
;
SStbObj
*
pStb
=
NULL
;
SSmaObj
*
pSma
=
NULL
;
SDbObj
*
pDb
=
NULL
;
SUserObj
*
pUser
=
NULL
;
SMCreateSmaReq
createReq
=
{
0
};
if
(
tDeserializeSMCreateSmaReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
mDebug
(
"sma:%s, start to create"
,
createReq
.
name
);
if
(
mndCheckCreateSmaReq
(
&
createReq
)
!=
0
)
{
goto
_OVER
;
}
pStb
=
mndAcquireStb
(
pMnode
,
createReq
.
stb
);
if
(
pStb
==
NULL
)
{
mError
(
"sma:%s, failed to create since stb:%s not exist"
,
createReq
.
name
,
createReq
.
stb
);
goto
_OVER
;
}
pSma
=
mndAcquireSma
(
pMnode
,
createReq
.
name
);
if
(
pSma
!=
NULL
)
{
if
(
createReq
.
igExists
)
{
mDebug
(
"sma:%s, already exist in sma:%s, ignore exist is set"
,
createReq
.
name
,
pSma
->
name
);
code
=
0
;
goto
_OVER
;
}
else
{
terrno
=
TSDB_CODE_MND_SMA_ALREADY_EXIST
;
goto
_OVER
;
}
}
pDb
=
mndAcquireDbBySma
(
pMnode
,
createReq
.
name
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
goto
_OVER
;
}
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pUser
==
NULL
)
{
goto
_OVER
;
}
if
(
mndCheckWriteAuth
(
pUser
,
pDb
)
!=
0
)
{
goto
_OVER
;
}
code
=
mndCreateTSma
(
pMnode
,
pReq
,
&
createReq
,
pDb
,
pStb
);
if
(
code
==
0
)
code
=
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"sma:%s, failed to create since %s"
,
createReq
.
name
,
terrstr
());
}
mndReleaseStb
(
pMnode
,
pStb
);
mndReleaseSma
(
pMnode
,
pSma
);
mndReleaseDb
(
pMnode
,
pDb
);
mndReleaseUser
(
pMnode
,
pUser
);
tFreeSMCreateSmaReq
(
&
createReq
);
return
code
;
}
static
int32_t
mndProcessVCreateSmaRsp
(
SNodeMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndSetDropSmaRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SSmaObj
*
pSma
)
{
SSdbRaw
*
pRedoRaw
=
mndSmaActionEncode
(
pSma
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropSmaCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SSmaObj
*
pSma
)
{
SSdbRaw
*
pCommitRaw
=
mndSmaActionEncode
(
pSma
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropSmaRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SSmaObj
*
pSma
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
int32_t
contLen
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
!=
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildVDropSmaReq
(
pMnode
,
pVgroup
,
pSma
,
&
contLen
);
if
(
pReq
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_VND_DROP_SMA
;
action
.
acceptableCode
=
TSDB_CODE_VND_TB_NOT_EXIST
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pReq
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndDropSma
(
SMnode
*
pMnode
,
SNodeMsg
*
pReq
,
SDbObj
*
pDb
,
SSmaObj
*
pSma
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_DROP_STB
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop sma:%s"
,
pTrans
->
id
,
pSma
->
name
);
mndTransSetDbInfo
(
pTrans
,
pDb
);
if
(
mndSetDropSmaRedoLogs
(
pMnode
,
pTrans
,
pSma
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropSmaCommitLogs
(
pMnode
,
pTrans
,
pSma
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropSmaRedoActions
(
pMnode
,
pTrans
,
pDb
,
pSma
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndProcessMDropSmaReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
code
=
-
1
;
SUserObj
*
pUser
=
NULL
;
SDbObj
*
pDb
=
NULL
;
SSmaObj
*
pSma
=
NULL
;
SMDropSmaReq
dropReq
=
{
0
};
if
(
tDeserializeSMDropSmaReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
mDebug
(
"sma:%s, start to drop"
,
dropReq
.
name
);
pSma
=
mndAcquireSma
(
pMnode
,
dropReq
.
name
);
if
(
pSma
==
NULL
)
{
if
(
dropReq
.
igNotExists
)
{
mDebug
(
"sma:%s, not exist, ignore not exist is set"
,
dropReq
.
name
);
code
=
0
;
goto
_OVER
;
}
else
{
terrno
=
TSDB_CODE_MND_SMA_NOT_EXIST
;
goto
_OVER
;
}
}
pDb
=
mndAcquireDbBySma
(
pMnode
,
dropReq
.
name
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
goto
_OVER
;
}
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pUser
==
NULL
)
{
goto
_OVER
;
}
if
(
mndCheckWriteAuth
(
pUser
,
pDb
)
!=
0
)
{
goto
_OVER
;
}
code
=
mndDropSma
(
pMnode
,
pReq
,
pDb
,
pSma
);
if
(
code
==
0
)
code
=
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"sma:%s, failed to drop since %s"
,
dropReq
.
name
,
terrstr
());
}
mndReleaseDb
(
pMnode
,
pDb
);
mndReleaseSma
(
pMnode
,
pSma
);
mndReleaseUser
(
pMnode
,
pUser
);
return
code
;
}
static
int32_t
mndProcessVDropSmaRsp
(
SNodeMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndGetSmaMeta
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
pSchemas
;
pShow
->
bytes
[
cols
]
=
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"name"
);
pSchema
[
cols
].
bytes
=
pShow
->
bytes
[
cols
];
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create_time"
);
pSchema
[
cols
].
bytes
=
pShow
->
bytes
[
cols
];
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"name"
);
pSchema
[
cols
].
bytes
=
pShow
->
bytes
[
cols
];
cols
++
;
pMeta
->
numOfColumns
=
cols
;
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
sdbGetSize
(
pSdb
,
SDB_SMA
);
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
strcpy
(
pMeta
->
tbName
,
mndShowStr
(
pShow
->
type
));
return
0
;
}
static
int32_t
mndRetrieveSma
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SSmaObj
*
pSma
=
NULL
;
int32_t
cols
=
0
;
char
*
pWrite
;
char
prefix
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pShow
->
db
);
if
(
pDb
==
NULL
)
return
0
;
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_SMA
,
pShow
->
pIter
,
(
void
**
)
&
pSma
);
if
(
pShow
->
pIter
==
NULL
)
break
;
if
(
pSma
->
dbUid
!=
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pSma
);
continue
;
}
cols
=
0
;
SName
smaName
=
{
0
};
tNameFromString
(
&
smaName
,
pSma
->
name
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_TO_VARSTR
(
pWrite
,
(
char
*
)
tNameGetTableName
(
&
smaName
));
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pSma
->
createdTime
;
cols
++
;
SName
stbName
=
{
0
};
tNameFromString
(
&
stbName
,
pSma
->
stb
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_TO_VARSTR
(
pWrite
,
(
char
*
)
tNameGetTableName
(
&
stbName
));
cols
++
;
numOfRows
++
;
sdbRelease
(
pSdb
,
pSma
);
}
mndReleaseDb
(
pMnode
,
pDb
);
pShow
->
numOfReads
+=
numOfRows
;
mndVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
return
numOfRows
;
}
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
69b2ec09
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndStb.h"
#include "mndAuth.h"
#include "mndDb.h"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录