Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
31f8cf3f
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
31f8cf3f
编写于
2月 10, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Organize mgmtUtil.c file
上级
d97941aa
变更
34
展开全部
隐藏空白更改
内联
并排
Showing
34 changed file
with
687 addition
and
681 deletion
+687
-681
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+2
-2
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+4
-4
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+2
-2
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+3
-3
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+6
-6
src/client/src/tscServer.c
src/client/src/tscServer.c
+14
-14
src/client/src/tscSql.c
src/client/src/tscSql.c
+1
-1
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+2
-2
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+2
-2
src/inc/mnode.h
src/inc/mnode.h
+9
-9
src/inc/taosmsg.h
src/inc/taosmsg.h
+15
-15
src/inc/tsdb.h
src/inc/tsdb.h
+1
-1
src/mnode/inc/mgmtSupertableQuery.h
src/mnode/inc/mgmtSupertableQuery.h
+29
-0
src/mnode/inc/mgmtTable.h
src/mnode/inc/mgmtTable.h
+8
-20
src/mnode/inc/mgmtUtil.h
src/mnode/inc/mgmtUtil.h
+10
-25
src/mnode/src/mgmtConn.c
src/mnode/src/mgmtConn.c
+1
-1
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+1
-1
src/mnode/src/mgmtDnodeInt.c
src/mnode/src/mgmtDnodeInt.c
+28
-28
src/mnode/src/mgmtMeter.c
src/mnode/src/mgmtMeter.c
+375
-375
src/mnode/src/mgmtProfile.c
src/mnode/src/mgmtProfile.c
+1
-1
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+17
-17
src/mnode/src/mgmtSupertableQuery.c
src/mnode/src/mgmtSupertableQuery.c
+40
-33
src/mnode/src/mgmtUtil.c
src/mnode/src/mgmtUtil.c
+4
-7
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+8
-8
src/modules/monitor/src/monitorSystem.c
src/modules/monitor/src/monitorSystem.c
+1
-1
src/rpc/src/trpc.c
src/rpc/src/trpc.c
+1
-1
src/sdb/src/hashstr.c
src/sdb/src/hashstr.c
+1
-1
src/vnode/detail/inc/vnode.h
src/vnode/detail/inc/vnode.h
+1
-1
src/vnode/detail/inc/vnodeRead.h
src/vnode/detail/inc/vnodeRead.h
+1
-1
src/vnode/detail/src/vnodeFile.c
src/vnode/detail/src/vnodeFile.c
+62
-62
src/vnode/detail/src/vnodeImport.c
src/vnode/detail/src/vnodeImport.c
+3
-3
src/vnode/detail/src/vnodeQueryImpl.c
src/vnode/detail/src/vnodeQueryImpl.c
+9
-9
src/vnode/detail/src/vnodeStream.c
src/vnode/detail/src/vnodeStream.c
+7
-7
src/vnode/detail/src/vnodeUtil.c
src/vnode/detail/src/vnodeUtil.c
+18
-18
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
31f8cf3f
...
...
@@ -30,10 +30,10 @@ extern "C" {
#include "tsdb.h"
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->
meter
Type == TSDB_TABLE_TYPE_SUPER_TABLE))
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->
table
Type == TSDB_TABLE_TYPE_SUPER_TABLE))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->
meter
Type == TSDB_TABLE_TYPE_CREATE_FROM_STABLE))
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->
table
Type == TSDB_TABLE_TYPE_CREATE_FROM_STABLE))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
...
...
src/client/inc/tsclient.h
浏览文件 @
31f8cf3f
...
...
@@ -54,7 +54,7 @@ typedef struct SMeterMetaInfo {
* 2. keep the vnode index for multi-vnode insertion
*/
int32_t
vnodeIndex
;
char
name
[
TSDB_
METER
_ID_LEN
+
1
];
// table(super table) name
char
name
[
TSDB_
TABLE
_ID_LEN
+
1
];
// table(super table) name
int16_t
numOfTags
;
// total required tags in query, including groupby tags
int16_t
tagColumnIndex
[
TSDB_MAX_TAGS
];
// clause + tag projection
}
SMeterMetaInfo
;
...
...
@@ -119,7 +119,7 @@ typedef struct SCond {
}
SCond
;
typedef
struct
SJoinNode
{
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
uint64_t
uid
;
int16_t
tagCol
;
}
SJoinNode
;
...
...
@@ -154,7 +154,7 @@ typedef struct SParamInfo {
}
SParamInfo
;
typedef
struct
STableDataBlocks
{
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
int8_t
tsSource
;
// where does the UNIX timestamp come from, server or client
bool
ordered
;
// if current rows are ordered or not
int64_t
vgid
;
// virtual group id
...
...
@@ -302,7 +302,7 @@ typedef struct _tsc_obj {
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_KEY_LEN
];
char
acctId
[
TSDB_DB_NAME_LEN
];
char
db
[
TSDB_
METER
_ID_LEN
];
char
db
[
TSDB_
TABLE
_ID_LEN
];
char
sversion
[
TSDB_VERSION_LEN
];
char
writeAuth
:
1
;
char
superAuth
:
1
;
...
...
src/client/src/tscLocal.c
浏览文件 @
31f8cf3f
...
...
@@ -79,8 +79,8 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
static
int32_t
tscMaxLengthOfTagsFields
(
SSqlObj
*
pSql
)
{
SMeterMeta
*
pMeta
=
tscGetMeterMetaInfo
(
&
pSql
->
cmd
,
0
,
0
)
->
pMeterMeta
;
if
(
pMeta
->
meterType
==
TSDB_TABLE_TYPE_SUPER_TABLE
||
pMeta
->
meter
Type
==
TSDB_TABLE_TYPE_NORMAL_TABLE
||
pMeta
->
meter
Type
==
TSDB_TABLE_TYPE_STREAM_TABLE
)
{
if
(
pMeta
->
tableType
==
TSDB_TABLE_TYPE_SUPER_TABLE
||
pMeta
->
table
Type
==
TSDB_TABLE_TYPE_NORMAL_TABLE
||
pMeta
->
table
Type
==
TSDB_TABLE_TYPE_STREAM_TABLE
)
{
return
0
;
}
...
...
src/client/src/tscParseInsert.c
浏览文件 @
31f8cf3f
...
...
@@ -776,7 +776,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
SMeterMetaInfo
*
pSTableMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
STABLE_INDEX
);
setMeterID
(
pSTableMeterMetaInfo
,
&
sToken
,
pSql
);
strncpy
(
pTag
->
name
,
pSTableMeterMetaInfo
->
name
,
TSDB_
METER
_ID_LEN
);
strncpy
(
pTag
->
name
,
pSTableMeterMetaInfo
->
name
,
TSDB_
TABLE
_ID_LEN
);
code
=
tscGetMeterMeta
(
pSql
,
pSTableMeterMetaInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -950,7 +950,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
}
int
validateTableName
(
char
*
tblName
,
int
len
)
{
char
buf
[
TSDB_
METER
_ID_LEN
]
=
{
0
};
char
buf
[
TSDB_
TABLE
_ID_LEN
]
=
{
0
};
strncpy
(
buf
,
tblName
,
len
);
SSQLToken
token
=
{.
n
=
len
,
.
type
=
TK_ID
,
.
z
=
buf
};
...
...
@@ -1544,7 +1544,7 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
continue
;
}
strncpy
(
pMeterMetaInfo
->
name
,
pDataBlock
->
meterId
,
TSDB_
METER
_ID_LEN
);
strncpy
(
pMeterMetaInfo
->
name
,
pDataBlock
->
meterId
,
TSDB_
TABLE
_ID_LEN
);
memset
(
pDataBlock
->
pData
,
0
,
pDataBlock
->
nAllocSize
);
int32_t
ret
=
tscGetMeterMeta
(
pSql
,
pMeterMetaInfo
);
...
...
src/client/src/tscSQLParser.c
浏览文件 @
31f8cf3f
...
...
@@ -1072,11 +1072,11 @@ int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pDB, SSQL
*
xlen
=
totalLen
;
}
if
(
totalLen
<
TSDB_
METER
_ID_LEN
)
{
if
(
totalLen
<
TSDB_
TABLE
_ID_LEN
)
{
fullName
[
totalLen
]
=
0
;
}
return
(
totalLen
<=
TSDB_
METER
_ID_LEN
)
?
TSDB_CODE_SUCCESS
:
TSDB_CODE_INVALID_SQL
;
return
(
totalLen
<=
TSDB_
TABLE
_ID_LEN
)
?
TSDB_CODE_SUCCESS
:
TSDB_CODE_INVALID_SQL
;
}
static
void
extractColumnNameFromString
(
tSQLExprItem
*
pItem
)
{
...
...
@@ -1901,7 +1901,7 @@ int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnInd
}
pIndex
->
tableIndex
=
COLUMN_INDEX_INITIAL_VAL
;
char
tableName
[
TSDB_
METER
_ID_LEN
+
1
]
=
{
0
};
char
tableName
[
TSDB_
TABLE
_ID_LEN
+
1
]
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
i
);
...
...
@@ -3365,7 +3365,7 @@ static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* ac
SStringBuilder
sb1
=
{
0
};
taosStringBuilderAppendStringLen
(
&
sb1
,
QUERY_COND_REL_PREFIX_IN
,
QUERY_COND_REL_PREFIX_IN_LEN
);
char
db
[
TSDB_
METER
_ID_LEN
]
=
{
0
};
char
db
[
TSDB_
TABLE
_ID_LEN
]
=
{
0
};
// remove the duplicated input table names
int32_t
num
=
0
;
...
...
@@ -3389,7 +3389,7 @@ static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* ac
taosStringBuilderAppendStringLen
(
&
sb1
,
TBNAME_LIST_SEP
,
1
);
}
char
idBuf
[
TSDB_
METER
_ID_LEN
+
1
]
=
{
0
};
char
idBuf
[
TSDB_
TABLE
_ID_LEN
+
1
]
=
{
0
};
int32_t
xlen
=
strlen
(
segments
[
i
]);
SSQLToken
t
=
{.
z
=
segments
[
i
],
.
n
=
xlen
,
.
type
=
TK_STRING
};
...
...
@@ -5300,7 +5300,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
}
// get meter meta from mnode
strncpy
(
pCreateTable
->
usingInfo
.
tagdata
.
name
,
pStableMeterMetaInfo
->
name
,
TSDB_
METER
_ID_LEN
);
strncpy
(
pCreateTable
->
usingInfo
.
tagdata
.
name
,
pStableMeterMetaInfo
->
name
,
TSDB_
TABLE
_ID_LEN
);
tVariantList
*
pList
=
pInfo
->
pCreateTableInfo
->
usingInfo
.
pTagVals
;
int32_t
code
=
tscGetMeterMeta
(
pSql
,
pStableMeterMetaInfo
);
...
...
src/client/src/tscServer.c
浏览文件 @
31f8cf3f
...
...
@@ -2685,7 +2685,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// fill head info
SMgmtHead
*
pMgmt
=
(
SMgmtHead
*
)(
pCmd
->
payload
+
tsRpcHeadSize
);
memset
(
pMgmt
->
db
,
0
,
TSDB_
METER
_ID_LEN
);
// server don't need the db
memset
(
pMgmt
->
db
,
0
,
TSDB_
TABLE
_ID_LEN
);
// server don't need the db
SMultiMeterInfoMsg
*
pInfoMsg
=
(
SMultiMeterInfoMsg
*
)(
pCmd
->
payload
+
tsRpcHeadSize
+
sizeof
(
SMgmtHead
));
pInfoMsg
->
numOfMeters
=
htonl
((
int32_t
)
pCmd
->
count
);
...
...
@@ -2709,7 +2709,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
static
int32_t
tscEstimateMetricMetaMsgSize
(
SSqlCmd
*
pCmd
)
{
const
int32_t
defaultSize
=
minMsgSize
()
+
sizeof
(
S
Metric
MetaMsg
)
+
sizeof
(
SMgmtHead
)
+
sizeof
(
int16_t
)
*
TSDB_MAX_TAGS
;
minMsgSize
()
+
sizeof
(
S
SuperTable
MetaMsg
)
+
sizeof
(
SMgmtHead
)
+
sizeof
(
int16_t
)
*
TSDB_MAX_TAGS
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
int32_t
n
=
0
;
...
...
@@ -2722,7 +2722,7 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
tagLen
+=
strlen
(
pQueryInfo
->
tagCond
.
tbnameCond
.
cond
)
*
TSDB_NCHAR_SIZE
;
}
int32_t
joinCondLen
=
(
TSDB_
METER
_ID_LEN
+
sizeof
(
int16_t
))
*
2
;
int32_t
joinCondLen
=
(
TSDB_
TABLE
_ID_LEN
+
sizeof
(
int16_t
))
*
2
;
int32_t
elemSize
=
sizeof
(
SMetricMetaElemMsg
)
*
pQueryInfo
->
numOfTables
;
int32_t
len
=
tagLen
+
joinCondLen
+
elemSize
+
defaultSize
;
...
...
@@ -2731,7 +2731,7 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
}
int
tscBuildMetricMetaMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
S
Metric
MetaMsg
*
pMetaMsg
;
S
SuperTable
MetaMsg
*
pMetaMsg
;
char
*
pMsg
,
*
pStart
;
int
msgLen
=
0
;
int
tableIndex
=
0
;
...
...
@@ -2757,25 +2757,25 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg
+=
sizeof
(
SMgmtHead
);
pMetaMsg
=
(
S
Metric
MetaMsg
*
)
pMsg
;
pMetaMsg
=
(
S
SuperTable
MetaMsg
*
)
pMsg
;
pMetaMsg
->
numOfMeters
=
htonl
(
pQueryInfo
->
numOfTables
);
pMsg
+=
sizeof
(
S
Metric
MetaMsg
);
pMsg
+=
sizeof
(
S
SuperTable
MetaMsg
);
int32_t
offset
=
pMsg
-
(
char
*
)
pMetaMsg
;
pMetaMsg
->
join
=
htonl
(
offset
);
// todo refactor
pMetaMsg
->
joinCondLen
=
htonl
((
TSDB_
METER
_ID_LEN
+
sizeof
(
int16_t
))
*
2
);
pMetaMsg
->
joinCondLen
=
htonl
((
TSDB_
TABLE
_ID_LEN
+
sizeof
(
int16_t
))
*
2
);
memcpy
(
pMsg
,
pTagCond
->
joinInfo
.
left
.
meterId
,
TSDB_
METER
_ID_LEN
);
pMsg
+=
TSDB_
METER
_ID_LEN
;
memcpy
(
pMsg
,
pTagCond
->
joinInfo
.
left
.
meterId
,
TSDB_
TABLE
_ID_LEN
);
pMsg
+=
TSDB_
TABLE
_ID_LEN
;
*
(
int16_t
*
)
pMsg
=
pTagCond
->
joinInfo
.
left
.
tagCol
;
pMsg
+=
sizeof
(
int16_t
);
memcpy
(
pMsg
,
pTagCond
->
joinInfo
.
right
.
meterId
,
TSDB_
METER
_ID_LEN
);
pMsg
+=
TSDB_
METER
_ID_LEN
;
memcpy
(
pMsg
,
pTagCond
->
joinInfo
.
right
.
meterId
,
TSDB_
TABLE
_ID_LEN
);
pMsg
+=
TSDB_
TABLE
_ID_LEN
;
*
(
int16_t
*
)
pMsg
=
pTagCond
->
joinInfo
.
right
.
tagCol
;
pMsg
+=
sizeof
(
int16_t
);
...
...
@@ -2991,7 +2991,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
int32_t
tagLen
=
0
;
SSchema
*
pTagsSchema
=
tsGetTagSchema
(
pMeta
);
if
(
pMeta
->
meter
Type
==
TSDB_TABLE_TYPE_CREATE_FROM_STABLE
)
{
if
(
pMeta
->
table
Type
==
TSDB_TABLE_TYPE_CREATE_FROM_STABLE
)
{
for
(
int32_t
i
=
0
;
i
<
pMeta
->
numOfTags
;
++
i
)
{
tagLen
+=
pTagsSchema
[
i
].
bytes
;
}
...
...
@@ -3106,7 +3106,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
int32_t
tagLen
=
0
;
SSchema
*
pTagsSchema
=
tsGetTagSchema
(
pMeta
);
if
(
pMeta
->
meter
Type
==
TSDB_TABLE_TYPE_CREATE_FROM_STABLE
)
{
if
(
pMeta
->
table
Type
==
TSDB_TABLE_TYPE_CREATE_FROM_STABLE
)
{
for
(
int32_t
j
=
0
;
j
<
pMeta
->
numOfTags
;
++
j
)
{
tagLen
+=
pTagsSchema
[
j
].
bytes
;
}
...
...
@@ -3304,7 +3304,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
}
int
tscProcessConnectRsp
(
SSqlObj
*
pSql
)
{
char
temp
[
TSDB_
METER
_ID_LEN
*
2
];
char
temp
[
TSDB_
TABLE
_ID_LEN
*
2
];
SConnectRsp
*
pConnect
;
STscObj
*
pObj
=
pSql
->
pTscObj
;
...
...
src/client/src/tscSql.c
浏览文件 @
31f8cf3f
...
...
@@ -1047,7 +1047,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
}
char
*
nextStr
;
char
tblName
[
TSDB_
METER
_ID_LEN
];
char
tblName
[
TSDB_
TABLE
_ID_LEN
];
int
payloadLen
=
0
;
char
*
pMsg
=
pCmd
->
payload
;
while
(
1
)
{
...
...
src/client/src/tscUtil.c
浏览文件 @
31f8cf3f
...
...
@@ -657,7 +657,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
dataBuf
->
size
=
startOffset
;
dataBuf
->
tsSource
=
-
1
;
strncpy
(
dataBuf
->
meterId
,
name
,
TSDB_
METER
_ID_LEN
);
strncpy
(
dataBuf
->
meterId
,
name
,
TSDB_
TABLE
_ID_LEN
);
/*
* The metermeta may be released since the metermeta cache are completed clean by other thread
...
...
@@ -1810,7 +1810,7 @@ SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SM
assert
(
pMeterMetaInfo
!=
NULL
);
if
(
name
!=
NULL
)
{
assert
(
strlen
(
name
)
<=
TSDB_
METER
_ID_LEN
);
assert
(
strlen
(
name
)
<=
TSDB_
TABLE
_ID_LEN
);
strcpy
(
pMeterMetaInfo
->
name
,
name
);
}
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
31f8cf3f
...
...
@@ -312,7 +312,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) {
pObj
->
vnode
=
pCreate
->
vnode
;
pObj
->
sid
=
pCreate
->
sid
;
pObj
->
uid
=
pCreate
->
uid
;
memcpy
(
pObj
->
meterId
,
pCreate
->
meterId
,
TSDB_
METER
_ID_LEN
);
memcpy
(
pObj
->
meterId
,
pCreate
->
meterId
,
TSDB_
TABLE
_ID_LEN
);
pObj
->
numOfColumns
=
pCreate
->
numOfColumns
;
pObj
->
timeStamp
=
pCreate
->
timeStamp
;
pObj
->
sversion
=
htonl
(
pCreate
->
sversion
);
...
...
@@ -377,7 +377,7 @@ int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
pObj
=
vnodeList
[
pRemove
->
vnode
].
meterList
[
pRemove
->
sid
];
if
(
pObj
==
NULL
)
goto
_remove_over
;
if
(
memcmp
(
pObj
->
meterId
,
pRemove
->
meterId
,
TSDB_
METER
_ID_LEN
)
!=
0
)
{
if
(
memcmp
(
pObj
->
meterId
,
pRemove
->
meterId
,
TSDB_
TABLE
_ID_LEN
)
!=
0
)
{
dWarn
(
"vid:%d sid:%d id:%s, remove ID:%s, meter ID not matched"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
,
pRemove
->
meterId
);
goto
_remove_over
;
...
...
src/inc/mnode.h
浏览文件 @
31f8cf3f
...
...
@@ -95,7 +95,7 @@ typedef struct {
}
SMeterGid
;
typedef
struct
_tab_obj
{
char
meterId
[
TSDB_
METER
_ID_LEN
+
1
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
+
1
];
uint64_t
uid
;
SMeterGid
gid
;
...
...
@@ -106,7 +106,7 @@ typedef struct _tab_obj {
int32_t
numOfColumns
;
int32_t
schemaSize
;
short
nextColId
;
char
meter
Type
:
4
;
char
table
Type
:
4
;
char
status
:
3
;
char
isDirty
:
1
;
// if the table change tag column 1 value
char
reserved
[
15
];
...
...
@@ -116,7 +116,7 @@ typedef struct _tab_obj {
tSkipList
*
pSkipList
;
struct
_tab_obj
*
pHead
;
// for metric, a link list for all meters created
// according to this metric
char
*
pTagData
;
// TSDB_
METER
_ID_LEN(metric_name)+
char
*
pTagData
;
// TSDB_
TABLE
_ID_LEN(metric_name)+
// tags_value1/tags_value2/tags_value3
struct
_tab_obj
*
prev
,
*
next
;
char
*
pSql
;
// pointer to SQL, for SC, null-terminated string
...
...
@@ -262,8 +262,8 @@ extern SDnodeObj dnodeObj;
// dnodeInt API
int
mgmtInitDnodeInt
();
void
mgmtCleanUpDnodeInt
();
int
mgmtSendCreateMsgToVgroup
(
STabObj
*
p
Meter
,
SVgObj
*
pVgroup
);
int
mgmtSendRemoveMeterMsgToDnode
(
STabObj
*
p
Meter
,
SVgObj
*
pVgroup
);
int
mgmtSendCreateMsgToVgroup
(
STabObj
*
p
Table
,
SVgObj
*
pVgroup
);
int
mgmtSendRemoveMeterMsgToDnode
(
STabObj
*
p
Table
,
SVgObj
*
pVgroup
);
int
mgmtSendVPeersMsg
(
SVgObj
*
pVgroup
);
int
mgmtSendFreeVnodeMsg
(
SVgObj
*
pVgroup
);
int
mgmtSendOneFreeVnodeMsg
(
SVnodeGid
*
pVnodeGid
);
...
...
@@ -284,8 +284,8 @@ int mgmtRetrieveUsers(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
void
mgmtCleanUpUsers
();
// metric API
int
mgmtAddMeterIntoMetric
(
STabObj
*
pMetric
,
STabObj
*
p
Meter
);
int
mgmtRemoveMeterFromMetric
(
STabObj
*
pMetric
,
STabObj
*
p
Meter
);
int
mgmtAddMeterIntoMetric
(
STabObj
*
pMetric
,
STabObj
*
p
Table
);
int
mgmtRemoveMeterFromMetric
(
STabObj
*
pMetric
,
STabObj
*
p
Table
);
int
mgmtGetMetricMeta
(
SMeterMeta
*
pMeta
,
SShowObj
*
pShow
,
SConnObj
*
pConn
);
int
mgmtRetrieveMetrics
(
SShowObj
*
pShow
,
char
*
data
,
int
rows
,
SConnObj
*
pConn
);
...
...
@@ -326,14 +326,14 @@ void mgmtCleanUpVgroups();
int
mgmtInitMeters
();
STabObj
*
mgmtGetTable
(
char
*
meterId
);
STabObj
*
mgmtGetTableInfo
(
char
*
src
,
char
*
tags
[]);
int
mgmtRetrieveMetricMeta
(
SConnObj
*
pConn
,
char
**
pStart
,
S
Metric
MetaMsg
*
pInfo
);
int
mgmtRetrieveMetricMeta
(
SConnObj
*
pConn
,
char
**
pStart
,
S
SuperTable
MetaMsg
*
pInfo
);
int
mgmtCreateMeter
(
SDbObj
*
pDb
,
SCreateTableMsg
*
pCreate
);
int
mgmtDropMeter
(
SDbObj
*
pDb
,
char
*
meterId
,
int
ignore
);
int
mgmtAlterMeter
(
SDbObj
*
pDb
,
SAlterTableMsg
*
pAlter
);
int
mgmtGetTableMeta
(
SMeterMeta
*
pMeta
,
SShowObj
*
pShow
,
SConnObj
*
pConn
);
int
mgmtRetrieveMeters
(
SShowObj
*
pShow
,
char
*
data
,
int
rows
,
SConnObj
*
pConn
);
void
mgmtCleanUpMeters
();
SSchema
*
mgmtGetTableSchema
(
STabObj
*
p
Meter
);
// get schema for a meter
SSchema
*
mgmtGetTableSchema
(
STabObj
*
p
Table
);
// get schema for a meter
// dnode API
...
...
src/inc/taosmsg.h
浏览文件 @
31f8cf3f
...
...
@@ -301,7 +301,7 @@ typedef struct {
uint64_t
uid
;
char
spi
;
char
encrypt
;
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
char
secret
[
TSDB_KEY_LEN
];
char
cipheringKey
[
TSDB_KEY_LEN
];
uint64_t
timeStamp
;
...
...
@@ -314,7 +314,7 @@ typedef struct {
}
SCreateMsg
;
typedef
struct
{
char
db
[
TSDB_
METER
_ID_LEN
];
char
db
[
TSDB_
TABLE
_ID_LEN
];
uint8_t
ignoreNotExists
;
}
SDropDbMsg
,
SUseDbMsg
;
...
...
@@ -327,7 +327,7 @@ typedef struct {
}
SShowTableMsg
;
typedef
struct
{
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
char
igExists
;
short
numOfTags
;
...
...
@@ -341,12 +341,12 @@ typedef struct {
}
SCreateTableMsg
;
typedef
struct
{
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
char
igNotExists
;
}
SDropTableMsg
;
typedef
struct
{
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
short
type
;
/* operation type */
char
tagVal
[
TSDB_MAX_BYTES_PER_ROW
];
short
numOfCols
;
/* number of schema */
...
...
@@ -355,7 +355,7 @@ typedef struct {
typedef
struct
{
char
clientVersion
[
TSDB_VERSION_LEN
];
char
db
[
TSDB_
METER
_ID_LEN
];
char
db
[
TSDB_
TABLE
_ID_LEN
];
}
SConnectMsg
;
typedef
struct
{
...
...
@@ -386,7 +386,7 @@ typedef struct {
}
SCreateUserMsg
,
SAlterUserMsg
;
typedef
struct
{
char
db
[
TSDB_
METER
_ID_LEN
];
char
db
[
TSDB_
TABLE
_ID_LEN
];
}
SMgmtHead
;
typedef
struct
{
...
...
@@ -400,7 +400,7 @@ typedef struct {
short
vnode
;
int32_t
sid
;
uint64_t
uid
;
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
}
SRemoveMeterMsg
;
typedef
struct
{
...
...
@@ -604,7 +604,7 @@ typedef struct {
* the message is too large, so it may will overwrite the cfg information in meterobj.v*
* recover to origin codes
*/
//char db[TSDB_
METER
_ID_LEN+2]; // 8bytes align
//char db[TSDB_
TABLE
_ID_LEN+2]; // 8bytes align
char
db
[
TSDB_DB_NAME_LEN
];
uint32_t
vgId
;
int32_t
maxSessions
;
...
...
@@ -695,7 +695,7 @@ typedef struct {
}
SVPeersMsg
;
typedef
struct
{
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
short
createFlag
;
char
tags
[];
}
SMeterInfoMsg
;
...
...
@@ -708,7 +708,7 @@ typedef struct {
typedef
struct
{
int16_t
elemLen
;
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
int16_t
orderIndex
;
int16_t
orderType
;
// used in group by xx order by xxx
...
...
@@ -732,7 +732,7 @@ typedef struct {
int32_t
join
;
int32_t
joinCondLen
;
// for join condition
int32_t
metaElem
[
TSDB_MAX_JOIN_TABLE_NUM
];
}
S
Metric
MetaMsg
;
}
S
SuperTable
MetaMsg
;
typedef
struct
{
SVPeerDesc
vpeerDesc
[
TSDB_VNODES_SUPPORT
];
...
...
@@ -751,7 +751,7 @@ typedef struct {
typedef
struct
SMeterMeta
{
uint8_t
numOfTags
:
6
;
uint8_t
precision
:
2
;
uint8_t
meter
Type
:
4
;
uint8_t
table
Type
:
4
;
uint8_t
index
:
4
;
// used locally
int16_t
numOfColumns
;
...
...
@@ -767,12 +767,12 @@ typedef struct SMeterMeta {
}
SMeterMeta
;
typedef
struct
SMultiMeterMeta
{
char
meterId
[
TSDB_
METER
_ID_LEN
];
// note: This field must be at the front
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
// note: This field must be at the front
SMeterMeta
meta
;
}
SMultiMeterMeta
;
typedef
struct
{
char
name
[
TSDB_
METER
_ID_LEN
];
char
name
[
TSDB_
TABLE
_ID_LEN
];
char
data
[
TSDB_MAX_TAGS_LEN
];
}
STagData
;
...
...
src/inc/tsdb.h
浏览文件 @
31f8cf3f
...
...
@@ -86,7 +86,7 @@ extern "C" {
#define TS_PATH_DELIMITER_LEN 1
#define TSDB_METER_ID_LEN_MARGIN 10
#define TSDB_
METER
_ID_LEN (TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*TS_PATH_DELIMITER_LEN+TSDB_USERID_LEN+TSDB_METER_ID_LEN_MARGIN) //TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*strlen(TS_PATH_DELIMITER)+strlen(USERID)
#define TSDB_
TABLE
_ID_LEN (TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*TS_PATH_DELIMITER_LEN+TSDB_USERID_LEN+TSDB_METER_ID_LEN_MARGIN) //TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*strlen(TS_PATH_DELIMITER)+strlen(USERID)
#define TSDB_UNI_LEN 24
#define TSDB_USER_LEN TSDB_UNI_LEN
#define TSDB_ACCT_LEN TSDB_UNI_LEN
...
...
src/mnode/inc/mgmtSupertableQuery.h
0 → 100644
浏览文件 @
31f8cf3f
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_SUPER_TABLE_QUERY_H
#define TBASE_MNODE_SUPER_TABLE_QUERY_H
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "mnode.h"
int32_t
mgmtRetrieveMetersFromSuperTable
(
SSuperTableMetaMsg
*
pInfo
,
int32_t
tableIndex
,
tQueryResultset
*
pRes
);
int32_t
mgmtDoJoin
(
SSuperTableMetaMsg
*
pSuperTableMetaMsg
,
tQueryResultset
*
pRes
);
void
mgmtReorganizeMetersInMetricMeta
(
SSuperTableMetaMsg
*
pInfo
,
int32_t
index
,
tQueryResultset
*
pRes
);
#endif
src/mnode/inc/mgmtTable.h
浏览文件 @
31f8cf3f
...
...
@@ -11,28 +11,16 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "tast.h"
*/
#ifndef TBASE_MNODE_
UTIL
_H
#define TBASE_MNODE_
UTIL
_H
#ifndef TBASE_MNODE_
TABLE
_H
#define TBASE_MNODE_
TABLE
_H
bool
mgmtTableCreateFromSuperTable
(
STabObj
*
pTableObj
);
bool
mgmtIsSuperTable
(
STabObj
*
pTableObj
);
bool
mgmtIsNormalTable
(
STabObj
*
pTableObj
);
typedef
struct
SSyntaxTreeFilterSupporter
{
SSchema
*
pTagSchema
;
int32_t
numOfTags
;
int32_t
optr
;
}
SSyntaxTreeFilterSupporter
;
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "mnode.h"
char
*
mgmtTableGetTag
(
STabObj
*
pMeter
,
int32_t
col
,
SSchema
*
pTagColSchema
);
int32_t
mgmtGetTagsLength
(
STabObj
*
pMetric
,
int32_t
col
);
bool
mgmtCheckIsMonitorDB
(
char
*
db
,
char
*
monitordb
);
int32_t
mgmtCheckDBParams
(
SCreateDbMsg
*
pCreate
);
int32_t
mgmtFindTagCol
(
STabObj
*
pTable
,
const
char
*
tagName
);
#endif
src/mnode/inc/mgmtUtil.h
浏览文件 @
31f8cf3f
...
...
@@ -11,37 +11,22 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "tast.h"
*/
#ifndef TBASE_MNODE_UTIL_H
#define TBASE_MNODE_UTIL_H
bool
mgmtTableCreateFromSuperTable
(
STabObj
*
pTableObj
);
bool
mgmtIsSuperTable
(
STabObj
*
pTableObj
);
bool
mgmtIsNormalTable
(
STabObj
*
pTableObj
);
typedef
struct
SSyntaxTreeFilterSupporter
{
SSchema
*
pTagSchema
;
int32_t
numOfTags
;
int32_t
optr
;
}
SSyntaxTreeFilterSupporter
;
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "mnode.h"
char
*
mgmtTableGetTag
(
STabObj
*
pMeter
,
int32_t
col
,
SSchema
*
pTagColSchema
);
int32_t
mgmtGetTagsLength
(
STabObj
*
pMetric
,
int32_t
col
);
bool
mgmtTableCreateFromSuperTable
(
STabObj
*
pTableObj
);
bool
mgmtIsSuperTable
(
STabObj
*
pTableObj
);
bool
mgmtIsNormalTable
(
STabObj
*
pTableObj
);
char
*
mgmtTableGetTag
(
STabObj
*
pTable
,
int32_t
col
,
SSchema
*
pTagColSchema
);
int32_t
mgmtGetTagsLength
(
STabObj
*
pSuperTable
,
int32_t
col
);
bool
mgmtCheckIsMonitorDB
(
char
*
db
,
char
*
monitordb
);
int32_t
mgmtCheckDBParams
(
SCreateDbMsg
*
pCreate
);
int32_t
mgmtFindTagCol
(
STabObj
*
pMetric
,
const
char
*
tagName
);
int32_t
mgmtRetrieveMetersFromMetric
(
SMetricMetaMsg
*
pInfo
,
int32_t
tableIndex
,
tQueryResultset
*
pRes
);
int32_t
mgmtDoJoin
(
SMetricMetaMsg
*
pMetricMetaMsg
,
tQueryResultset
*
pRes
);
void
mgmtReorganizeMetersInMetricMeta
(
SMetricMetaMsg
*
pInfo
,
int32_t
index
,
tQueryResultset
*
pRes
);
bool
tSkipListNodeFilterCallback
(
const
void
*
pNode
,
void
*
param
);
#endif
src/mnode/src/mgmtConn.c
浏览文件 @
31f8cf3f
...
...
@@ -20,7 +20,7 @@
#include "tschemautil.h"
typedef
struct
{
char
user
[
TSDB_
METER
_ID_LEN
];
char
user
[
TSDB_
TABLE
_ID_LEN
];
uint64_t
stime
;
uint32_t
ip
;
uint16_t
port
;
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
31f8cf3f
...
...
@@ -116,7 +116,7 @@ int mgmtInitDbs() {
SDbObj
*
mgmtGetDb
(
char
*
db
)
{
return
(
SDbObj
*
)
sdbGetRow
(
dbSdb
,
db
);
}
SDbObj
*
mgmtGetDbByMeterId
(
char
*
meterId
)
{
char
db
[
TSDB_
METER
_ID_LEN
],
*
pos
;
char
db
[
TSDB_
TABLE
_ID_LEN
],
*
pos
;
pos
=
strstr
(
meterId
,
TS_PATH_DELIMITER
);
pos
=
strstr
(
pos
+
1
,
TS_PATH_DELIMITER
);
...
...
src/mnode/src/mgmtDnodeInt.c
浏览文件 @
31f8cf3f
...
...
@@ -24,7 +24,7 @@
void
mgmtProcessMsgFromDnode
(
char
*
content
,
int
msgLen
,
int
msgType
,
SDnodeObj
*
pObj
);
int
mgmtSendVPeersMsg
(
SVgObj
*
pVgroup
);
char
*
mgmtBuildVpeersIe
(
char
*
pMsg
,
SVgObj
*
pVgroup
,
int
vnode
);
char
*
mgmtBuildCreateMeterIe
(
STabObj
*
p
Meter
,
char
*
pMsg
,
int
vnode
);
char
*
mgmtBuildCreateMeterIe
(
STabObj
*
p
Table
,
char
*
pMsg
,
int
vnode
);
/*
* functions for communicate between dnode and mnode
...
...
@@ -39,7 +39,7 @@ int taosSendMsgToDnode(SDnodeObj *pObj, char *msg, int msgLen);
int
mgmtProcessMeterCfgMsg
(
char
*
cont
,
int
contLen
,
SDnodeObj
*
pObj
)
{
char
*
pMsg
,
*
pStart
;
int
msgLen
=
0
;
STabObj
*
p
Meter
=
NULL
;
STabObj
*
p
Table
=
NULL
;
SMeterCfgMsg
*
pCfg
=
(
SMeterCfgMsg
*
)
cont
;
SVgObj
*
pVgroup
;
...
...
@@ -63,13 +63,13 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
int
vgId
=
pObj
->
vload
[
vnode
].
vgId
;
pVgroup
=
mgmtGetVgroup
(
vgId
);
if
(
pVgroup
)
p
Meter
=
pVgroup
->
meterList
[
sid
];
if
(
pVgroup
)
p
Table
=
pVgroup
->
meterList
[
sid
];
}
if
(
p
Meter
)
{
if
(
p
Table
)
{
*
pMsg
=
0
;
// code
pMsg
++
;
pMsg
=
mgmtBuildCreateMeterIe
(
p
Meter
,
pMsg
,
vnode
);
pMsg
=
mgmtBuildCreateMeterIe
(
p
Table
,
pMsg
,
vnode
);
}
else
{
mTrace
(
"dnode:%s, vnode:%d sid:%d, meter not there"
,
taosIpStr
(
pObj
->
privateIp
),
vnode
,
sid
);
*
pMsg
=
TSDB_CODE_INVALID_METER_ID
;
...
...
@@ -187,48 +187,48 @@ void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *
}
}
char
*
mgmtBuildCreateMeterIe
(
STabObj
*
p
Meter
,
char
*
pMsg
,
int
vnode
)
{
char
*
mgmtBuildCreateMeterIe
(
STabObj
*
p
Table
,
char
*
pMsg
,
int
vnode
)
{
SCreateMsg
*
pCreateMeter
;
pCreateMeter
=
(
SCreateMsg
*
)
pMsg
;
pCreateMeter
->
vnode
=
htons
(
vnode
);
pCreateMeter
->
sid
=
htonl
(
p
Meter
->
gid
.
sid
);
pCreateMeter
->
uid
=
p
Meter
->
uid
;
memcpy
(
pCreateMeter
->
meterId
,
p
Meter
->
meterId
,
TSDB_METER
_ID_LEN
);
pCreateMeter
->
sid
=
htonl
(
p
Table
->
gid
.
sid
);
pCreateMeter
->
uid
=
p
Table
->
uid
;
memcpy
(
pCreateMeter
->
meterId
,
p
Table
->
meterId
,
TSDB_TABLE
_ID_LEN
);
// pCreateMeter->lastCreate = htobe64(pVgroup->lastCreate);
pCreateMeter
->
timeStamp
=
htobe64
(
p
Meter
->
createdTime
);
pCreateMeter
->
timeStamp
=
htobe64
(
p
Table
->
createdTime
);
/*
pCreateMeter->spi = pSec->spi;
pCreateMeter->encrypt = pSec->encrypt;
memcpy(pCreateMeter->cipheringKey, pSec->cipheringKey, TSDB_KEY_LEN);
memcpy(pCreateMeter->secret, pSec->secret, TSDB_KEY_LEN);
*/
pCreateMeter
->
sversion
=
htonl
(
p
Meter
->
sversion
);
pCreateMeter
->
numOfColumns
=
htons
(
p
Meter
->
numOfColumns
);
SSchema
*
pSchema
=
mgmtGetTableSchema
(
p
Meter
);
pCreateMeter
->
sversion
=
htonl
(
p
Table
->
sversion
);
pCreateMeter
->
numOfColumns
=
htons
(
p
Table
->
numOfColumns
);
SSchema
*
pSchema
=
mgmtGetTableSchema
(
p
Table
);
for
(
int
i
=
0
;
i
<
p
Meter
->
numOfColumns
;
++
i
)
{
for
(
int
i
=
0
;
i
<
p
Table
->
numOfColumns
;
++
i
)
{
pCreateMeter
->
schema
[
i
].
type
=
pSchema
[
i
].
type
;
/* strcpy(pCreateMeter->schema[i].name, pSchema[i].name); */
pCreateMeter
->
schema
[
i
].
bytes
=
htons
(
pSchema
[
i
].
bytes
);
pCreateMeter
->
schema
[
i
].
colId
=
htons
(
pSchema
[
i
].
colId
);
}
pMsg
=
((
char
*
)(
pCreateMeter
->
schema
))
+
p
Meter
->
numOfColumns
*
sizeof
(
SMColumn
);
pMsg
=
((
char
*
)(
pCreateMeter
->
schema
))
+
p
Table
->
numOfColumns
*
sizeof
(
SMColumn
);
pCreateMeter
->
sqlLen
=
0
;
if
(
p
Meter
->
pSql
)
{
int
len
=
strlen
(
p
Meter
->
pSql
)
+
1
;
if
(
p
Table
->
pSql
)
{
int
len
=
strlen
(
p
Table
->
pSql
)
+
1
;
pCreateMeter
->
sqlLen
=
htons
(
len
);
strcpy
(
pMsg
,
p
Meter
->
pSql
);
strcpy
(
pMsg
,
p
Table
->
pSql
);
pMsg
+=
len
;
}
return
pMsg
;
}
int
mgmtSendCreateMsgToVgroup
(
STabObj
*
p
Meter
,
SVgObj
*
pVgroup
)
{
int
mgmtSendCreateMsgToVgroup
(
STabObj
*
p
Table
,
SVgObj
*
pVgroup
)
{
char
*
pMsg
,
*
pStart
;
int
i
,
msgLen
=
0
;
SDnodeObj
*
pObj
;
...
...
@@ -244,7 +244,7 @@ int mgmtSendCreateMsgToVgroup(STabObj *pMeter, SVgObj *pVgroup) {
pStart
=
taosBuildReqMsgToDnodeWithSize
(
pObj
,
TSDB_MSG_TYPE_CREATE
,
64000
);
if
(
pStart
==
NULL
)
continue
;
pMsg
=
mgmtBuildCreateMeterIe
(
p
Meter
,
pStart
,
pVgroup
->
vnodeGid
[
i
].
vnode
);
pMsg
=
mgmtBuildCreateMeterIe
(
p
Table
,
pStart
,
pVgroup
->
vnodeGid
[
i
].
vnode
);
msgLen
=
pMsg
-
pStart
;
taosSendMsgToDnode
(
pObj
,
pStart
,
msgLen
);
...
...
@@ -255,7 +255,7 @@ int mgmtSendCreateMsgToVgroup(STabObj *pMeter, SVgObj *pVgroup) {
return
0
;
}
int
mgmtSendRemoveMeterMsgToDnode
(
STabObj
*
p
Meter
,
SVgObj
*
pVgroup
)
{
int
mgmtSendRemoveMeterMsgToDnode
(
STabObj
*
p
Table
,
SVgObj
*
pVgroup
)
{
SRemoveMeterMsg
*
pRemove
;
char
*
pMsg
,
*
pStart
;
int
i
,
msgLen
=
0
;
...
...
@@ -277,8 +277,8 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
pRemove
=
(
SRemoveMeterMsg
*
)
pMsg
;
pRemove
->
vnode
=
htons
(
pVgroup
->
vnodeGid
[
i
].
vnode
);
pRemove
->
sid
=
htonl
(
p
Meter
->
gid
.
sid
);
memcpy
(
pRemove
->
meterId
,
p
Meter
->
meterId
,
TSDB_METER
_ID_LEN
);
pRemove
->
sid
=
htonl
(
p
Table
->
gid
.
sid
);
memcpy
(
pRemove
->
meterId
,
p
Table
->
meterId
,
TSDB_TABLE
_ID_LEN
);
pMsg
+=
sizeof
(
SRemoveMeterMsg
);
msgLen
=
pMsg
-
pStart
;
...
...
@@ -287,7 +287,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
tinet_ntoa
(
ipstr
,
pVgroup
->
vnodeGid
[
i
].
ip
);
mTrace
(
"dnode:%s vid:%d, send remove meter msg, sid:%d status:%d"
,
ipstr
,
pVgroup
->
vnodeGid
[
i
].
vnode
,
p
Meter
->
gid
.
sid
,
pObj
->
status
);
p
Table
->
gid
.
sid
,
pObj
->
status
);
}
pVgroup
->
lastRemove
=
timeStamp
;
...
...
@@ -295,7 +295,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
return
0
;
}
int
mgmtSendAlterStreamMsgToDnode
(
STabObj
*
p
Meter
,
SVgObj
*
pVgroup
)
{
int
mgmtSendAlterStreamMsgToDnode
(
STabObj
*
p
Table
,
SVgObj
*
pVgroup
)
{
SAlterStreamMsg
*
pAlter
;
char
*
pMsg
,
*
pStart
;
int
i
,
msgLen
=
0
;
...
...
@@ -313,9 +313,9 @@ int mgmtSendAlterStreamMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
pAlter
=
(
SAlterStreamMsg
*
)
pMsg
;
pAlter
->
vnode
=
htons
(
pVgroup
->
vnodeGid
[
i
].
vnode
);
pAlter
->
sid
=
htonl
(
p
Meter
->
gid
.
sid
);
pAlter
->
uid
=
p
Meter
->
uid
;
pAlter
->
status
=
p
Meter
->
status
;
pAlter
->
sid
=
htonl
(
p
Table
->
gid
.
sid
);
pAlter
->
uid
=
p
Table
->
uid
;
pAlter
->
status
=
p
Table
->
status
;
pMsg
+=
sizeof
(
SAlterStreamMsg
);
msgLen
=
pMsg
-
pStart
;
...
...
src/mnode/src/mgmtMeter.c
浏览文件 @
31f8cf3f
此差异已折叠。
点击以展开。
src/mnode/src/mgmtProfile.c
浏览文件 @
31f8cf3f
...
...
@@ -24,7 +24,7 @@
typedef
struct
{
uint32_t
ip
;
uint16_t
port
;
char
user
[
TSDB_
METER
_ID_LEN
];
char
user
[
TSDB_
TABLE
_ID_LEN
];
}
SCDesc
;
typedef
struct
{
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
31f8cf3f
...
...
@@ -118,7 +118,7 @@ static void mgmtSetSchemaFromMeters(SSchema *pSchema, STabObj *pMeterObj, uint32
static
uint32_t
mgmtSetMeterTagValue
(
char
*
pTags
,
STabObj
*
pMetric
,
STabObj
*
pMeterObj
)
{
SSchema
*
pTagSchema
=
(
SSchema
*
)(
pMetric
->
schema
+
pMetric
->
numOfColumns
*
sizeof
(
SSchema
));
char
*
tagVal
=
pMeterObj
->
pTagData
+
TSDB_
METER
_ID_LEN
;
// tag start position
char
*
tagVal
=
pMeterObj
->
pTagData
+
TSDB_
TABLE
_ID_LEN
;
// tag start position
uint32_t
tagsLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
pMetric
->
numOfTags
;
++
i
)
{
...
...
@@ -224,8 +224,8 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
int32_t
code
=
mgmtCreateMeter
(
pDb
,
pCreateMsg
);
char
stableName
[
TSDB_
METER
_ID_LEN
]
=
{
0
};
strncpy
(
stableName
,
pInfo
->
tags
,
TSDB_
METER
_ID_LEN
);
char
stableName
[
TSDB_
TABLE
_ID_LEN
]
=
{
0
};
strncpy
(
stableName
,
pInfo
->
tags
,
TSDB_
TABLE
_ID_LEN
);
mTrace
(
"meter:%s is automatically created by %s from %s, code:%d"
,
pCreateMsg
->
meterId
,
pConn
->
pUser
->
user
,
stableName
,
code
);
...
...
@@ -274,7 +274,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
pMeta
->
numOfTags
=
pMeterObj
->
numOfTags
;
pMeta
->
numOfColumns
=
htons
(
pMeterObj
->
numOfColumns
);
pMeta
->
meterType
=
pMeterObj
->
meter
Type
;
pMeta
->
tableType
=
pMeterObj
->
table
Type
;
pMsg
+=
sizeof
(
SMeterMeta
);
pSchema
=
(
SSchema
*
)
pMsg
;
// schema locates at the end of SMeterMeta struct
...
...
@@ -361,7 +361,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
}
int32_t
totalNum
=
0
;
char
tblName
[
TSDB_
METER
_ID_LEN
];
char
tblName
[
TSDB_
TABLE
_ID_LEN
];
char
*
nextStr
;
char
*
pCurMeter
=
pStart
+
sizeof
(
STaosRsp
)
+
sizeof
(
SMultiMeterInfoMsg
)
+
1
;
// 1: ie type byte
...
...
@@ -414,7 +414,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
pMeta
->
meta
.
precision
=
pDbObj
->
cfg
.
precision
;
pMeta
->
meta
.
numOfTags
=
pMeterObj
->
numOfTags
;
pMeta
->
meta
.
numOfColumns
=
htons
(
pMeterObj
->
numOfColumns
);
pMeta
->
meta
.
meterType
=
pMeterObj
->
meter
Type
;
pMeta
->
meta
.
tableType
=
pMeterObj
->
table
Type
;
pCurMeter
+=
sizeof
(
SMultiMeterMeta
);
pSchema
=
(
SSchema
*
)
pCurMeter
;
// schema locates at the end of SMeterMeta struct
...
...
@@ -494,21 +494,21 @@ _exit_code:
}
int
mgmtProcessMetricMetaMsg
(
char
*
pMsg
,
int
msgLen
,
SConnObj
*
pConn
)
{
S
MetricMetaMsg
*
pMetricMetaMsg
=
(
SMetric
MetaMsg
*
)
pMsg
;
S
SuperTableMetaMsg
*
pSuperTableMetaMsg
=
(
SSuperTable
MetaMsg
*
)
pMsg
;
STabObj
*
pMetric
;
STaosRsp
*
pRsp
;
char
*
pStart
;
p
MetricMetaMsg
->
numOfMeters
=
htonl
(
pMetric
MetaMsg
->
numOfMeters
);
p
SuperTableMetaMsg
->
numOfMeters
=
htonl
(
pSuperTable
MetaMsg
->
numOfMeters
);
p
MetricMetaMsg
->
join
=
htonl
(
pMetric
MetaMsg
->
join
);
p
MetricMetaMsg
->
joinCondLen
=
htonl
(
pMetric
MetaMsg
->
joinCondLen
);
p
SuperTableMetaMsg
->
join
=
htonl
(
pSuperTable
MetaMsg
->
join
);
p
SuperTableMetaMsg
->
joinCondLen
=
htonl
(
pSuperTable
MetaMsg
->
joinCondLen
);
for
(
int32_t
i
=
0
;
i
<
p
Metric
MetaMsg
->
numOfMeters
;
++
i
)
{
p
MetricMetaMsg
->
metaElem
[
i
]
=
htonl
(
pMetric
MetaMsg
->
metaElem
[
i
]);
for
(
int32_t
i
=
0
;
i
<
p
SuperTable
MetaMsg
->
numOfMeters
;
++
i
)
{
p
SuperTableMetaMsg
->
metaElem
[
i
]
=
htonl
(
pSuperTable
MetaMsg
->
metaElem
[
i
]);
}
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)(((
char
*
)
p
MetricMetaMsg
)
+
pMetric
MetaMsg
->
metaElem
[
0
]);
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)(((
char
*
)
p
SuperTableMetaMsg
)
+
pSuperTable
MetaMsg
->
metaElem
[
0
]);
pMetric
=
mgmtGetTable
(
pElem
->
meterId
);
SDbObj
*
pDb
=
NULL
;
...
...
@@ -531,7 +531,7 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
msgLen
=
pMsg
-
pStart
;
}
else
{
msgLen
=
mgmtRetrieveMetricMeta
(
pConn
,
&
pStart
,
p
Metric
MetaMsg
);
msgLen
=
mgmtRetrieveMetricMeta
(
pConn
,
&
pStart
,
p
SuperTable
MetaMsg
);
if
(
msgLen
<=
0
)
{
taosSendSimpleRsp
(
pConn
->
thandle
,
TSDB_MSG_TYPE_METRIC_META_RSP
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
return
0
;
...
...
@@ -1076,11 +1076,11 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
}
else
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
==
TSDB_CODE_TABLE_ALREADY_EXIST
)
{
// table already created when the second attempt to create table
STabObj
*
p
Meter
=
mgmtGetTable
(
pCreate
->
meterId
);
assert
(
p
Meter
!=
NULL
);
STabObj
*
p
Table
=
mgmtGetTable
(
pCreate
->
meterId
);
assert
(
p
Table
!=
NULL
);
mWarn
(
"table:%s, table already created, failed to create table, ts:%"
PRId64
", code:%d"
,
pCreate
->
meterId
,
p
Meter
->
createdTime
,
code
);
p
Table
->
createdTime
,
code
);
}
else
{
// other errors
mError
(
"table:%s, failed to create table, code:%d"
,
pCreate
->
meterId
,
code
);
}
...
...
src/mnode/src/mgmtSupertableQuery.c
浏览文件 @
31f8cf3f
...
...
@@ -23,6 +23,12 @@
#include "tsqlfunction.h"
#include "vnodeTagMgmt.h"
typedef
struct
SSyntaxTreeFilterSupporter
{
SSchema
*
pTagSchema
;
int32_t
numOfTags
;
int32_t
optr
;
}
SSyntaxTreeFilterSupporter
;
typedef
struct
SJoinSupporter
{
void
**
val
;
void
**
pTabObjs
;
...
...
@@ -41,6 +47,7 @@ typedef struct SMeterNameFilterSupporter {
}
SMeterNameFilterSupporter
;
static
void
tansformQueryResult
(
tQueryResultset
*
pRes
);
static
bool
tSkipListNodeFilterCallback
(
const
void
*
pNode
,
void
*
param
);
static
int32_t
tabObjVGIDComparator
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
STabObj
*
p1
=
*
(
STabObj
**
)
pLeft
;
...
...
@@ -82,7 +89,7 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para
f1
=
pNode1
->
meterId
;
f2
=
pNode2
->
meterId
;
schema
.
type
=
TSDB_DATA_TYPE_BINARY
;
schema
.
bytes
=
TSDB_
METER
_ID_LEN
;
schema
.
bytes
=
TSDB_
TABLE
_ID_LEN
;
}
else
{
f1
=
mgmtTableGetTag
(
pNode1
,
colIdx
,
NULL
);
f2
=
mgmtTableGetTag
(
pNode2
,
colIdx
,
&
schema
);
...
...
@@ -104,15 +111,15 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para
* update the tag order index according to the tags column index. The tags column index needs to be checked one-by-one,
* since the normal columns may be passed to server for handling the group by on status column.
*
* @param p
Metric
MetaMsg
* @param p
SuperTable
MetaMsg
* @param tableIndex
* @param pOrderIndexInfo
* @param numOfTags
*/
static
void
mgmtUpdateOrderTagColIndex
(
S
MetricMetaMsg
*
pMetric
MetaMsg
,
int32_t
tableIndex
,
tOrderIdx
*
pOrderIndexInfo
,
static
void
mgmtUpdateOrderTagColIndex
(
S
SuperTableMetaMsg
*
pSuperTable
MetaMsg
,
int32_t
tableIndex
,
tOrderIdx
*
pOrderIndexInfo
,
int32_t
numOfTags
)
{
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)((
char
*
)
p
MetricMetaMsg
+
pMetric
MetaMsg
->
metaElem
[
tableIndex
]);
SColIndexEx
*
groupColumnList
=
(
SColIndexEx
*
)((
char
*
)
p
Metric
MetaMsg
+
pElem
->
groupbyTagColumnList
);
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)((
char
*
)
p
SuperTableMetaMsg
+
pSuperTable
MetaMsg
->
metaElem
[
tableIndex
]);
SColIndexEx
*
groupColumnList
=
(
SColIndexEx
*
)((
char
*
)
p
SuperTable
MetaMsg
+
pElem
->
groupbyTagColumnList
);
int32_t
numOfGroupbyTags
=
0
;
for
(
int32_t
i
=
0
;
i
<
pElem
->
numOfGroupCols
;
++
i
)
{
...
...
@@ -127,12 +134,12 @@ static void mgmtUpdateOrderTagColIndex(SMetricMetaMsg* pMetricMetaMsg, int32_t t
}
// todo merge sort function with losertree used
void
mgmtReorganizeMetersInMetricMeta
(
S
MetricMetaMsg
*
pMetric
MetaMsg
,
int32_t
tableIndex
,
tQueryResultset
*
pRes
)
{
void
mgmtReorganizeMetersInMetricMeta
(
S
SuperTableMetaMsg
*
pSuperTable
MetaMsg
,
int32_t
tableIndex
,
tQueryResultset
*
pRes
)
{
if
(
pRes
->
num
<=
0
)
{
// no result, no need to pagination
return
;
}
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)((
char
*
)
p
MetricMetaMsg
+
pMetric
MetaMsg
->
metaElem
[
tableIndex
]);
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)((
char
*
)
p
SuperTableMetaMsg
+
pSuperTable
MetaMsg
->
metaElem
[
tableIndex
]);
STabObj
*
pMetric
=
mgmtGetTable
(
pElem
->
meterId
);
SSchema
*
pTagSchema
=
(
SSchema
*
)(
pMetric
->
schema
+
pMetric
->
numOfColumns
*
sizeof
(
SSchema
));
...
...
@@ -149,7 +156,7 @@ void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pMetricMetaMsg, int32_t ta
int32_t
*
startPos
=
NULL
;
int32_t
numOfSubset
=
1
;
mgmtUpdateOrderTagColIndex
(
p
Metric
MetaMsg
,
tableIndex
,
&
descriptor
->
orderIdx
,
pMetric
->
numOfTags
);
mgmtUpdateOrderTagColIndex
(
p
SuperTable
MetaMsg
,
tableIndex
,
&
descriptor
->
orderIdx
,
pMetric
->
numOfTags
);
if
(
descriptor
->
orderIdx
.
numOfOrderedCols
>
0
)
{
tQSortEx
(
pRes
->
pRes
,
POINTER_BYTES
,
0
,
pRes
->
num
-
1
,
descriptor
,
tabObjResultComparator
);
startPos
=
calculateSubGroup
(
pRes
->
pRes
,
pRes
->
num
,
&
numOfSubset
,
descriptor
,
tabObjResultComparator
);
...
...
@@ -193,7 +200,7 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
}
/* not a table created from metric, ignore */
if
(
pMeterObj
->
meter
Type
!=
TSDB_TABLE_TYPE_CREATE_FROM_STABLE
)
{
if
(
pMeterObj
->
table
Type
!=
TSDB_TABLE_TYPE_CREATE_FROM_STABLE
)
{
continue
;
}
...
...
@@ -202,7 +209,7 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
* uid, so compare according to meterid
*/
STabObj
*
parentMetric
=
mgmtGetTable
(
pMeterObj
->
pTagData
);
if
(
strncasecmp
(
parentMetric
->
meterId
,
pMetric
->
meterId
,
TSDB_
METER
_ID_LEN
)
!=
0
||
if
(
strncasecmp
(
parentMetric
->
meterId
,
pMetric
->
meterId
,
TSDB_
TABLE
_ID_LEN
)
!=
0
||
(
parentMetric
->
uid
!=
pMetric
->
uid
))
{
continue
;
}
...
...
@@ -214,13 +221,13 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
static
bool
mgmtTablenameFilterCallback
(
tSkipListNode
*
pNode
,
void
*
param
)
{
SMeterNameFilterSupporter
*
pSupporter
=
(
SMeterNameFilterSupporter
*
)
param
;
char
name
[
TSDB_
METER
_ID_LEN
]
=
{
0
};
char
name
[
TSDB_
TABLE
_ID_LEN
]
=
{
0
};
// pattern compare for meter name
STabObj
*
pMeterObj
=
(
STabObj
*
)
pNode
->
pData
;
extractTableName
(
pMeterObj
->
meterId
,
name
);
return
patternMatch
(
pSupporter
->
pattern
,
name
,
TSDB_
METER
_ID_LEN
,
&
pSupporter
->
info
)
==
TSDB_PATTERN_MATCH
;
return
patternMatch
(
pSupporter
->
pattern
,
name
,
TSDB_
TABLE
_ID_LEN
,
&
pSupporter
->
info
)
==
TSDB_PATTERN_MATCH
;
}
static
void
mgmtRetrieveFromLikeOptr
(
tQueryResultset
*
pRes
,
const
char
*
str
,
STabObj
*
pMetric
)
{
...
...
@@ -282,8 +289,8 @@ UNUSED_FUNC static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param
return
false
;
}
static
void
orderResult
(
S
MetricMetaMsg
*
pMetric
MetaMsg
,
tQueryResultset
*
pRes
,
int16_t
colIndex
,
int32_t
tableIndex
)
{
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)((
char
*
)
p
MetricMetaMsg
+
pMetric
MetaMsg
->
metaElem
[
tableIndex
]);
static
void
orderResult
(
S
SuperTableMetaMsg
*
pSuperTable
MetaMsg
,
tQueryResultset
*
pRes
,
int16_t
colIndex
,
int32_t
tableIndex
)
{
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)((
char
*
)
p
SuperTableMetaMsg
+
pSuperTable
MetaMsg
->
metaElem
[
tableIndex
]);
tOrderDescriptor
*
descriptor
=
(
tOrderDescriptor
*
)
calloc
(
1
,
sizeof
(
tOrderDescriptor
)
+
sizeof
(
int32_t
)
*
1
);
// only one column for join
...
...
@@ -322,13 +329,13 @@ static int32_t mgmtCheckForDuplicateTagValue(tQueryResultset* pRes, int32_t inde
return
TSDB_CODE_SUCCESS
;
}
int32_t
mgmtDoJoin
(
S
MetricMetaMsg
*
pMetric
MetaMsg
,
tQueryResultset
*
pRes
)
{
if
(
p
Metric
MetaMsg
->
numOfMeters
==
1
)
{
int32_t
mgmtDoJoin
(
S
SuperTableMetaMsg
*
pSuperTable
MetaMsg
,
tQueryResultset
*
pRes
)
{
if
(
p
SuperTable
MetaMsg
->
numOfMeters
==
1
)
{
return
TSDB_CODE_SUCCESS
;
}
bool
allEmpty
=
false
;
for
(
int32_t
i
=
0
;
i
<
p
Metric
MetaMsg
->
numOfMeters
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
SuperTable
MetaMsg
->
numOfMeters
;
++
i
)
{
if
(
pRes
[
i
].
num
==
0
)
{
// all results are empty if one of them is empty
allEmpty
=
true
;
break
;
...
...
@@ -336,7 +343,7 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
}
if
(
allEmpty
)
{
for
(
int32_t
i
=
0
;
i
<
p
Metric
MetaMsg
->
numOfMeters
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
SuperTable
MetaMsg
->
numOfMeters
;
++
i
)
{
pRes
[
i
].
num
=
0
;
tfree
(
pRes
[
i
].
pRes
);
}
...
...
@@ -344,15 +351,15 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
return
TSDB_CODE_SUCCESS
;
}
char
*
cond
=
(
char
*
)
p
MetricMetaMsg
+
pMetric
MetaMsg
->
join
;
char
*
cond
=
(
char
*
)
p
SuperTableMetaMsg
+
pSuperTable
MetaMsg
->
join
;
char
left
[
TSDB_
METER
_ID_LEN
+
1
]
=
{
0
};
char
left
[
TSDB_
TABLE
_ID_LEN
+
1
]
=
{
0
};
strcpy
(
left
,
cond
);
int16_t
leftTagColIndex
=
*
(
int16_t
*
)(
cond
+
TSDB_
METER
_ID_LEN
);
int16_t
leftTagColIndex
=
*
(
int16_t
*
)(
cond
+
TSDB_
TABLE
_ID_LEN
);
char
right
[
TSDB_
METER
_ID_LEN
+
1
]
=
{
0
};
strcpy
(
right
,
cond
+
TSDB_
METER
_ID_LEN
+
sizeof
(
int16_t
));
int16_t
rightTagColIndex
=
*
(
int16_t
*
)(
cond
+
TSDB_
METER
_ID_LEN
*
2
+
sizeof
(
int16_t
));
char
right
[
TSDB_
TABLE
_ID_LEN
+
1
]
=
{
0
};
strcpy
(
right
,
cond
+
TSDB_
TABLE
_ID_LEN
+
sizeof
(
int16_t
));
int16_t
rightTagColIndex
=
*
(
int16_t
*
)(
cond
+
TSDB_
TABLE
_ID_LEN
*
2
+
sizeof
(
int16_t
));
STabObj
*
pLeftMetric
=
mgmtGetTable
(
left
);
STabObj
*
pRightMetric
=
mgmtGetTable
(
right
);
...
...
@@ -361,7 +368,7 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
int32_t
leftIndex
=
0
;
int32_t
rightIndex
=
0
;
for
(
int32_t
i
=
0
;
i
<
p
Metric
MetaMsg
->
numOfMeters
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
SuperTable
MetaMsg
->
numOfMeters
;
++
i
)
{
STabObj
*
pObj
=
(
STabObj
*
)
pRes
[
i
].
pRes
[
0
];
STabObj
*
pMetric1
=
mgmtGetTable
(
pObj
->
pTagData
);
if
(
pMetric1
==
pLeftMetric
)
{
...
...
@@ -371,8 +378,8 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
}
}
orderResult
(
p
Metric
MetaMsg
,
&
pRes
[
leftIndex
],
leftTagColIndex
,
leftIndex
);
orderResult
(
p
Metric
MetaMsg
,
&
pRes
[
rightIndex
],
rightTagColIndex
,
rightIndex
);
orderResult
(
p
SuperTable
MetaMsg
,
&
pRes
[
leftIndex
],
leftTagColIndex
,
leftIndex
);
orderResult
(
p
SuperTable
MetaMsg
,
&
pRes
[
rightIndex
],
rightTagColIndex
,
rightIndex
);
int32_t
i
=
0
;
int32_t
j
=
0
;
...
...
@@ -727,7 +734,7 @@ static int32_t mgmtFilterMeterByIndex(STabObj* pMetric, tQueryResultset* pRes, c
return
TSDB_CODE_SUCCESS
;
}
int
mgmtRetrieveMetersFromMetric
(
SMetric
MetaMsg
*
pMsg
,
int32_t
tableIndex
,
tQueryResultset
*
pRes
)
{
int
32_t
mgmtRetrieveMetersFromSuperTable
(
SSuperTable
MetaMsg
*
pMsg
,
int32_t
tableIndex
,
tQueryResultset
*
pRes
)
{
SMetricMetaElemMsg
*
pElem
=
(
SMetricMetaElemMsg
*
)((
char
*
)
pMsg
+
pMsg
->
metaElem
[
tableIndex
]);
STabObj
*
pMetric
=
mgmtGetTable
(
pElem
->
meterId
);
char
*
pCond
=
NULL
;
...
...
@@ -806,11 +813,11 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer
}
// todo refactor!!!!!
static
char
*
getTagValueFromMeter
(
STabObj
*
p
Meter
,
int32_t
offset
,
int32_t
len
,
char
*
param
)
{
static
char
*
getTagValueFromMeter
(
STabObj
*
p
Table
,
int32_t
offset
,
int32_t
len
,
char
*
param
)
{
if
(
offset
==
TSDB_TBNAME_COLUMN_INDEX
)
{
extractTableName
(
p
Meter
->
meterId
,
param
);
extractTableName
(
p
Table
->
meterId
,
param
);
}
else
{
char
*
tags
=
p
Meter
->
pTagData
+
offset
+
TSDB_METER
_ID_LEN
;
// tag start position
char
*
tags
=
p
Table
->
pTagData
+
offset
+
TSDB_TABLE
_ID_LEN
;
// tag start position
memcpy
(
param
,
tags
,
len
);
// make sure the value is null-terminated string
}
...
...
@@ -820,11 +827,11 @@ static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len,
bool
tSkipListNodeFilterCallback
(
const
void
*
pNode
,
void
*
param
)
{
tQueryInfo
*
pInfo
=
(
tQueryInfo
*
)
param
;
STabObj
*
p
Meter
=
(
STabObj
*
)(((
tSkipListNode
*
)
pNode
)
->
pData
);
STabObj
*
p
Table
=
(
STabObj
*
)(((
tSkipListNode
*
)
pNode
)
->
pData
);
char
buf
[
TSDB_MAX_TAGS_LEN
]
=
{
0
};
char
*
val
=
getTagValueFromMeter
(
p
Meter
,
pInfo
->
offset
,
pInfo
->
sch
.
bytes
,
buf
);
char
*
val
=
getTagValueFromMeter
(
p
Table
,
pInfo
->
offset
,
pInfo
->
sch
.
bytes
,
buf
);
int8_t
type
=
pInfo
->
sch
.
type
;
int32_t
ret
=
0
;
...
...
src/mnode/src/mgmtUtil.c
浏览文件 @
31f8cf3f
...
...
@@ -15,19 +15,16 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "mgmtUtil.h"
#include "textbuffer.h"
#include "tschemautil.h"
#include "tsqlfunction.h"
bool
mgmtTableCreateFromSuperTable
(
STabObj
*
pTableObj
)
{
return
pTableObj
->
meter
Type
==
TSDB_TABLE_TYPE_CREATE_FROM_STABLE
;
return
pTableObj
->
table
Type
==
TSDB_TABLE_TYPE_CREATE_FROM_STABLE
;
}
bool
mgmtIsSuperTable
(
STabObj
*
pTableObj
)
{
return
pTableObj
->
meter
Type
==
TSDB_TABLE_TYPE_SUPER_TABLE
;
return
pTableObj
->
table
Type
==
TSDB_TABLE_TYPE_SUPER_TABLE
;
}
bool
mgmtIsNormalTable
(
STabObj
*
pTableObj
)
{
...
...
@@ -37,7 +34,7 @@ bool mgmtIsNormalTable(STabObj* pTableObj) {
/**
* TODO: the tag offset value should be kept in memory to avoid dynamically calculating the value
*
* @param p
Meter
* @param p
Table
* @param col
* @param pTagColSchema
* @return
...
...
@@ -48,7 +45,7 @@ char* mgmtTableGetTag(STabObj* pTable, int32_t col, SSchema* pTagColSchema) {
}
STabObj
*
pSuperTable
=
mgmtGetTable
(
pTable
->
pTagData
);
int32_t
offset
=
mgmtGetTagsLength
(
pSuperTable
,
col
)
+
TSDB_
METER
_ID_LEN
;
int32_t
offset
=
mgmtGetTagsLength
(
pSuperTable
,
col
)
+
TSDB_
TABLE
_ID_LEN
;
assert
(
offset
>
0
);
if
(
pTagColSchema
!=
NULL
)
{
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
31f8cf3f
...
...
@@ -168,13 +168,13 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
}
int
mgmtDropVgroup
(
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
STabObj
*
p
Meter
;
STabObj
*
p
Table
;
if
(
pVgroup
->
numOfMeters
>
0
)
{
for
(
int
i
=
0
;
i
<
pDb
->
cfg
.
maxSessions
;
++
i
)
{
if
(
pVgroup
->
meterList
!=
NULL
)
{
p
Meter
=
pVgroup
->
meterList
[
i
];
if
(
p
Meter
)
mgmtDropMeter
(
pDb
,
pMeter
->
meterId
,
0
);
p
Table
=
pVgroup
->
meterList
[
i
];
if
(
p
Table
)
mgmtDropMeter
(
pDb
,
pTable
->
meterId
,
0
);
}
}
}
...
...
@@ -239,14 +239,14 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int
maxReplica
=
0
;
SVgObj
*
pVgroup
=
NULL
;
STabObj
*
p
Meter
=
NULL
;
STabObj
*
p
Table
=
NULL
;
if
(
pShow
->
payloadLen
>
0
)
{
p
Meter
=
mgmtGetTable
(
pShow
->
payload
);
if
(
NULL
==
p
Meter
)
{
p
Table
=
mgmtGetTable
(
pShow
->
payload
);
if
(
NULL
==
p
Table
)
{
return
TSDB_CODE_INVALID_METER_ID
;
}
pVgroup
=
mgmtGetVgroup
(
p
Meter
->
gid
.
vgId
);
pVgroup
=
mgmtGetVgroup
(
p
Table
->
gid
.
vgId
);
if
(
NULL
==
pVgroup
)
return
TSDB_CODE_INVALID_METER_ID
;
maxReplica
=
pVgroup
->
numOfVnodes
>
maxReplica
?
pVgroup
->
numOfVnodes
:
maxReplica
;
...
...
@@ -292,7 +292,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
if
(
NULL
==
p
Meter
)
{
if
(
NULL
==
p
Table
)
{
pShow
->
numOfRows
=
pDb
->
numOfVgroups
;
pShow
->
pNode
=
pDb
->
pHead
;
}
else
{
...
...
src/modules/monitor/src/monitorSystem.c
浏览文件 @
31f8cf3f
...
...
@@ -188,7 +188,7 @@ void dnodeBuildMonitorSql(char *sql, int cmd) {
snprintf
(
sql
,
SQL_LENGTH
,
"create table if not exists %s.slowquery(ts timestamp, username "
"binary(%d), created_time timestamp, time bigint, sql binary(%d))"
,
tsMonitorDbName
,
TSDB_
METER
_ID_LEN
,
TSDB_SHOW_SQL_LEN
);
tsMonitorDbName
,
TSDB_
TABLE
_ID_LEN
,
TSDB_SHOW_SQL_LEN
);
}
else
if
(
cmd
==
MONITOR_CMD_CREATE_TB_LOG
)
{
snprintf
(
sql
,
SQL_LENGTH
,
"create table if not exists %s.log(ts timestamp, level tinyint, "
...
...
src/rpc/src/trpc.c
浏览文件 @
31f8cf3f
...
...
@@ -722,7 +722,7 @@ int taosSetSecurityInfo(int chann, int sid, char *id, int spi, int encrypt, char
pConn->encrypt = encrypt;
memcpy(pConn->secret, pConn->secret, TSDB_KEY_LEN);
memcpy(pConn->cipheringKey, ckey, TSDB_KEY_LEN);
memcpy(pConn->meterId, id, TSDB_
METER
_ID_LEN);
memcpy(pConn->meterId, id, TSDB_
TABLE
_ID_LEN);
*/
return
-
1
;
}
...
...
src/sdb/src/hashstr.c
浏览文件 @
31f8cf3f
...
...
@@ -19,7 +19,7 @@
#define MAX_STR_LEN 40
typedef
struct
_str_node_t
{
char
string
[
TSDB_
METER
_ID_LEN
];
char
string
[
TSDB_
TABLE
_ID_LEN
];
int
hash
;
struct
_str_node_t
*
prev
;
struct
_str_node_t
*
next
;
...
...
src/vnode/detail/inc/vnode.h
浏览文件 @
31f8cf3f
...
...
@@ -158,7 +158,7 @@ typedef struct SColumn {
typedef
struct
_meter_obj
{
uint64_t
uid
;
char
meterId
[
TSDB_
METER
_ID_LEN
];
char
meterId
[
TSDB_
TABLE
_ID_LEN
];
int
sid
;
short
vnode
;
short
numOfColumns
;
...
...
src/vnode/detail/inc/vnodeRead.h
浏览文件 @
31f8cf3f
...
...
@@ -253,7 +253,7 @@ typedef struct SMeterQuerySupportObj {
typedef
struct
_qinfo
{
uint64_t
signature
;
int32_t
refCount
;
// QInfo reference count, when the value is 0, it can be released safely
char
user
[
TSDB_
METER
_ID_LEN
+
1
];
char
user
[
TSDB_
TABLE
_ID_LEN
+
1
];
char
sql
[
TSDB_SHOW_SQL_LEN
];
uint8_t
stream
;
uint16_t
port
;
...
...
src/vnode/detail/src/vnodeFile.c
浏览文件 @
31f8cf3f
...
...
@@ -492,7 +492,7 @@ void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid) {
SMeterObj
*
pObj
=
NULL
;
SCompInfo
compInfo
=
{
0
};
SCompHeader
*
pHeader
;
SMeterInfo
*
meterInfo
=
NULL
,
*
p
Meter
=
NULL
;
SMeterInfo
*
meterInfo
=
NULL
,
*
p
Table
=
NULL
;
SQuery
query
;
SColumnInfoEx
colList
[
TSDB_MAX_COLUMNS
]
=
{
0
};
SSqlFunctionExpr
pExprs
[
TSDB_MAX_COLUMNS
]
=
{
0
};
...
...
@@ -617,7 +617,7 @@ _again:
continue
;
}
p
Meter
=
meterInfo
+
sid
;
p
Table
=
meterInfo
+
sid
;
pHeader
=
((
SCompHeader
*
)
tmem
)
+
sid
;
if
(
pVnode
->
hfd
>
0
)
{
...
...
@@ -633,18 +633,18 @@ _again:
goto
_over
;
}
else
{
if
(
pObj
->
uid
==
compInfo
.
uid
)
{
p
Meter
->
oldNumOfBlocks
=
compInfo
.
numOfBlocks
;
p
Meter
->
oldCompBlockOffset
=
pHeader
->
compInfoOffset
+
sizeof
(
SCompInfo
);
p
Meter
->
last
=
compInfo
.
last
;
p
Table
->
oldNumOfBlocks
=
compInfo
.
numOfBlocks
;
p
Table
->
oldCompBlockOffset
=
pHeader
->
compInfoOffset
+
sizeof
(
SCompInfo
);
p
Table
->
last
=
compInfo
.
last
;
if
(
compInfo
.
numOfBlocks
>
maxOldBlocks
)
maxOldBlocks
=
compInfo
.
numOfBlocks
;
if
(
p
Meter
->
last
)
{
if
(
p
Table
->
last
)
{
lseek
(
pVnode
->
hfd
,
sizeof
(
SCompBlock
)
*
(
compInfo
.
numOfBlocks
-
1
),
SEEK_CUR
);
read
(
pVnode
->
hfd
,
&
p
Meter
->
lastBlock
,
sizeof
(
SCompBlock
));
read
(
pVnode
->
hfd
,
&
p
Table
->
lastBlock
,
sizeof
(
SCompBlock
));
}
}
else
{
dTrace
(
"vid:%d sid:%d id:%s, uid:%"
PRIu64
" is not matched with old:%"
PRIu64
", old data will be thrown away"
,
vnode
,
sid
,
pObj
->
meterId
,
pObj
->
uid
,
compInfo
.
uid
);
p
Meter
->
oldNumOfBlocks
=
0
;
p
Table
->
oldNumOfBlocks
=
0
;
}
}
}
else
{
...
...
@@ -669,8 +669,8 @@ _again:
pObj
->
pointsPerFileBlock
*
pObj
->
schema
[
col
-
1
].
bytes
+
EXTRA_BYTES
+
sizeof
(
TSCKSUM
));
}
p
Meter
=
meterInfo
+
sid
;
p
Meter
->
tempHeadOffset
=
headLen
;
p
Table
=
meterInfo
+
sid
;
p
Table
->
tempHeadOffset
=
headLen
;
memset
(
&
query
,
0
,
sizeof
(
query
));
query
.
colList
=
colList
;
...
...
@@ -690,27 +690,27 @@ _again:
pointsReadLast
=
0
;
// last block is at last file
if
(
p
Meter
->
last
)
{
if
((
p
Meter
->
lastBlock
.
sversion
!=
pObj
->
sversion
)
||
(
query
.
over
))
{
if
(
p
Table
->
last
)
{
if
((
p
Table
->
lastBlock
.
sversion
!=
pObj
->
sversion
)
||
(
query
.
over
))
{
// TODO : Check the correctness of this code. write the last block to
// .data file
pCompBlock
=
(
SCompBlock
*
)(
hmem
+
headLen
);
assert
(
dmem
-
(
char
*
)
pCompBlock
>=
sizeof
(
SCompBlock
));
*
pCompBlock
=
p
Meter
->
lastBlock
;
if
(
p
Meter
->
lastBlock
.
sversion
!=
pObj
->
sversion
)
{
*
pCompBlock
=
p
Table
->
lastBlock
;
if
(
p
Table
->
lastBlock
.
sversion
!=
pObj
->
sversion
)
{
pCompBlock
->
last
=
0
;
pCompBlock
->
offset
=
lseek
(
pVnode
->
dfd
,
0
,
SEEK_END
);
p
Meter
->
last
=
0
;
lseek
(
pVnode
->
lfd
,
p
Meter
->
lastBlock
.
offset
,
SEEK_SET
);
tsendfile
(
pVnode
->
dfd
,
pVnode
->
lfd
,
NULL
,
p
Meter
->
lastBlock
.
len
);
pVnode
->
dfSize
=
pCompBlock
->
offset
+
p
Meter
->
lastBlock
.
len
;
p
Table
->
last
=
0
;
lseek
(
pVnode
->
lfd
,
p
Table
->
lastBlock
.
offset
,
SEEK_SET
);
tsendfile
(
pVnode
->
dfd
,
pVnode
->
lfd
,
NULL
,
p
Table
->
lastBlock
.
len
);
pVnode
->
dfSize
=
pCompBlock
->
offset
+
p
Table
->
lastBlock
.
len
;
}
else
{
if
(
ssid
==
0
)
{
assert
(
pCompBlock
->
last
&&
pVnode
->
tfd
!=
-
1
);
pCompBlock
->
offset
=
lseek
(
pVnode
->
tfd
,
0
,
SEEK_END
);
lseek
(
pVnode
->
lfd
,
p
Meter
->
lastBlock
.
offset
,
SEEK_SET
);
tsendfile
(
pVnode
->
tfd
,
pVnode
->
lfd
,
NULL
,
p
Meter
->
lastBlock
.
len
);
pVnode
->
lfSize
=
pCompBlock
->
offset
+
p
Meter
->
lastBlock
.
len
;
lseek
(
pVnode
->
lfd
,
p
Table
->
lastBlock
.
offset
,
SEEK_SET
);
tsendfile
(
pVnode
->
tfd
,
pVnode
->
lfd
,
NULL
,
p
Table
->
lastBlock
.
len
);
pVnode
->
lfSize
=
pCompBlock
->
offset
+
p
Table
->
lastBlock
.
len
;
}
else
{
assert
(
pVnode
->
tfd
==
-
1
);
}
...
...
@@ -718,12 +718,12 @@ _again:
}
headLen
+=
sizeof
(
SCompBlock
);
p
Meter
->
newNumOfBlocks
++
;
p
Table
->
newNumOfBlocks
++
;
}
else
{
// read last block into memory
if
(
vnodeReadLastBlockToMem
(
pObj
,
&
p
Meter
->
lastBlock
,
data
)
<
0
)
goto
_over
;
p
Meter
->
last
=
0
;
pointsReadLast
=
p
Meter
->
lastBlock
.
numOfPoints
;
if
(
vnodeReadLastBlockToMem
(
pObj
,
&
p
Table
->
lastBlock
,
data
)
<
0
)
goto
_over
;
p
Table
->
last
=
0
;
pointsReadLast
=
p
Table
->
lastBlock
.
numOfPoints
;
query
.
over
=
0
;
headInfo
.
totalStorage
-=
(
pointsReadLast
*
pObj
->
bytesPerPoint
);
...
...
@@ -731,8 +731,8 @@ _again:
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
,
pointsReadLast
);
}
p
Meter
->
changed
=
1
;
p
Meter
->
oldNumOfBlocks
--
;
p
Table
->
changed
=
1
;
p
Table
->
oldNumOfBlocks
--
;
}
while
(
query
.
over
==
0
)
{
...
...
@@ -753,17 +753,17 @@ _again:
pCompBlock
->
last
=
1
;
if
(
vnodeWriteBlockToFile
(
pObj
,
pCompBlock
,
data
,
cdata
,
pointsRead
)
<
0
)
goto
_over
;
if
(
pCompBlock
->
keyLast
>
pObj
->
lastKeyOnFile
)
pObj
->
lastKeyOnFile
=
pCompBlock
->
keyLast
;
p
Meter
->
last
=
pCompBlock
->
last
;
p
Table
->
last
=
pCompBlock
->
last
;
// write block info into header buffer
headLen
+=
sizeof
(
SCompBlock
);
p
Meter
->
newNumOfBlocks
++
;
p
Meter
->
committedPoints
+=
(
pointsRead
-
pointsReadLast
);
p
Table
->
newNumOfBlocks
++
;
p
Table
->
committedPoints
+=
(
pointsRead
-
pointsReadLast
);
dTrace
(
"vid:%d sid:%d id:%s, pointsRead:%d, pointsReadLast:%d lastKey:%"
PRId64
", "
"slot:%d pos:%d newNumOfBlocks:%d headLen:%d"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
,
pointsRead
,
pointsReadLast
,
pObj
->
lastKeyOnFile
,
query
.
slot
,
query
.
pos
,
p
Meter
->
newNumOfBlocks
,
headLen
);
p
Table
->
newNumOfBlocks
,
headLen
);
if
(
pointsRead
<
pObj
->
pointsPerFileBlock
||
query
.
keyIsMet
)
break
;
...
...
@@ -772,12 +772,12 @@ _again:
}
dTrace
(
"vid:%d sid:%d id:%s, %d points are committed, lastKey:%"
PRId64
" slot:%d pos:%d newNumOfBlocks:%d"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
,
p
Meter
->
committedPoints
,
pObj
->
lastKeyOnFile
,
query
.
slot
,
query
.
pos
,
p
Meter
->
newNumOfBlocks
);
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
,
p
Table
->
committedPoints
,
pObj
->
lastKeyOnFile
,
query
.
slot
,
query
.
pos
,
p
Table
->
newNumOfBlocks
);
if
(
p
Meter
->
committedPoints
>
0
)
{
p
Meter
->
commitSlot
=
query
.
slot
;
p
Meter
->
commitPos
=
query
.
pos
;
if
(
p
Table
->
committedPoints
>
0
)
{
p
Table
->
commitSlot
=
query
.
slot
;
p
Table
->
commitPos
=
query
.
pos
;
}
TSKEY
nextKey
=
0
;
...
...
@@ -805,19 +805,19 @@ _again:
continue
;
}
p
Meter
=
meterInfo
+
sid
;
p
Meter
->
compInfoOffset
=
compInfoOffset
;
p
Meter
->
finalNumOfBlocks
=
pMeter
->
oldNumOfBlocks
+
pMeter
->
newNumOfBlocks
;
p
Table
=
meterInfo
+
sid
;
p
Table
->
compInfoOffset
=
compInfoOffset
;
p
Table
->
finalNumOfBlocks
=
pTable
->
oldNumOfBlocks
+
pTable
->
newNumOfBlocks
;
if
(
p
Meter
->
finalNumOfBlocks
>
0
)
{
pHeader
->
compInfoOffset
=
p
Meter
->
compInfoOffset
;
compInfoOffset
+=
sizeof
(
SCompInfo
)
+
p
Meter
->
finalNumOfBlocks
*
sizeof
(
SCompBlock
)
+
sizeof
(
TSCKSUM
);
if
(
p
Table
->
finalNumOfBlocks
>
0
)
{
pHeader
->
compInfoOffset
=
p
Table
->
compInfoOffset
;
compInfoOffset
+=
sizeof
(
SCompInfo
)
+
p
Table
->
finalNumOfBlocks
*
sizeof
(
SCompBlock
)
+
sizeof
(
TSCKSUM
);
}
else
{
pHeader
->
compInfoOffset
=
0
;
}
dTrace
(
"vid:%d sid:%d id:%s, oldBlocks:%d numOfBlocks:%d compInfoOffset:%d"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
,
p
Meter
->
oldNumOfBlocks
,
pMeter
->
finalNumOfBlocks
,
compInfoOffset
);
p
Table
->
oldNumOfBlocks
,
pTable
->
finalNumOfBlocks
,
compInfoOffset
);
}
// write the comp header into new file
...
...
@@ -838,16 +838,16 @@ _again:
pObj
=
(
SMeterObj
*
)(
pVnode
->
meterList
[
sid
]);
if
(
pObj
==
NULL
)
continue
;
p
Meter
=
meterInfo
+
sid
;
if
(
p
Meter
->
finalNumOfBlocks
<=
0
)
continue
;
p
Table
=
meterInfo
+
sid
;
if
(
p
Table
->
finalNumOfBlocks
<=
0
)
continue
;
compInfo
.
last
=
p
Meter
->
last
;
compInfo
.
last
=
p
Table
->
last
;
compInfo
.
uid
=
pObj
->
uid
;
compInfo
.
numOfBlocks
=
p
Meter
->
finalNumOfBlocks
;
/* compInfo.compBlockLen = p
Meter
->finalCompBlockLen; */
compInfo
.
numOfBlocks
=
p
Table
->
finalNumOfBlocks
;
/* compInfo.compBlockLen = p
Table
->finalCompBlockLen; */
compInfo
.
delimiter
=
TSDB_VNODE_DELIMITER
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)(
&
compInfo
),
sizeof
(
SCompInfo
));
lseek
(
pVnode
->
nfd
,
p
Meter
->
compInfoOffset
,
SEEK_SET
);
lseek
(
pVnode
->
nfd
,
p
Table
->
compInfoOffset
,
SEEK_SET
);
if
(
twrite
(
pVnode
->
nfd
,
&
compInfo
,
sizeof
(
compInfo
))
<=
0
)
{
dError
(
"vid:%d sid:%d id:%s, failed to write:%s, reason:%s"
,
vnode
,
sid
,
pObj
->
meterId
,
pVnode
->
nfn
,
strerror
(
errno
));
...
...
@@ -857,23 +857,23 @@ _again:
// write the old comp blocks
chksum
=
0
;
if
(
pVnode
->
hfd
&&
p
Meter
->
oldNumOfBlocks
)
{
lseek
(
pVnode
->
hfd
,
p
Meter
->
oldCompBlockOffset
,
SEEK_SET
);
if
(
p
Meter
->
changed
)
{
int
compBlockLen
=
p
Meter
->
oldNumOfBlocks
*
sizeof
(
SCompBlock
);
if
(
pVnode
->
hfd
&&
p
Table
->
oldNumOfBlocks
)
{
lseek
(
pVnode
->
hfd
,
p
Table
->
oldCompBlockOffset
,
SEEK_SET
);
if
(
p
Table
->
changed
)
{
int
compBlockLen
=
p
Table
->
oldNumOfBlocks
*
sizeof
(
SCompBlock
);
read
(
pVnode
->
hfd
,
pOldCompBlocks
,
compBlockLen
);
twrite
(
pVnode
->
nfd
,
pOldCompBlocks
,
compBlockLen
);
chksum
=
taosCalcChecksum
(
0
,
pOldCompBlocks
,
compBlockLen
);
}
else
{
tsendfile
(
pVnode
->
nfd
,
pVnode
->
hfd
,
NULL
,
p
Meter
->
oldNumOfBlocks
*
sizeof
(
SCompBlock
));
tsendfile
(
pVnode
->
nfd
,
pVnode
->
hfd
,
NULL
,
p
Table
->
oldNumOfBlocks
*
sizeof
(
SCompBlock
));
read
(
pVnode
->
hfd
,
&
chksum
,
sizeof
(
TSCKSUM
));
}
}
if
(
p
Meter
->
newNumOfBlocks
)
{
chksum
=
taosCalcChecksum
(
chksum
,
(
uint8_t
*
)(
hmem
+
p
Meter
->
tempHeadOffset
),
p
Meter
->
newNumOfBlocks
*
sizeof
(
SCompBlock
));
if
(
twrite
(
pVnode
->
nfd
,
hmem
+
p
Meter
->
tempHeadOffset
,
pMeter
->
newNumOfBlocks
*
sizeof
(
SCompBlock
))
<=
0
)
{
if
(
p
Table
->
newNumOfBlocks
)
{
chksum
=
taosCalcChecksum
(
chksum
,
(
uint8_t
*
)(
hmem
+
p
Table
->
tempHeadOffset
),
p
Table
->
newNumOfBlocks
*
sizeof
(
SCompBlock
));
if
(
twrite
(
pVnode
->
nfd
,
hmem
+
p
Table
->
tempHeadOffset
,
pTable
->
newNumOfBlocks
*
sizeof
(
SCompBlock
))
<=
0
)
{
dError
(
"vid:%d sid:%d id:%s, failed to write:%s, reason:%s"
,
vnode
,
sid
,
pObj
->
meterId
,
pVnode
->
nfn
,
strerror
(
errno
));
vnodeRecoverFromPeer
(
pVnode
,
pVnode
->
commitFileId
);
...
...
@@ -891,11 +891,11 @@ _again:
pObj
=
(
SMeterObj
*
)(
pVnode
->
meterList
[
sid
]);
if
(
pObj
==
NULL
)
continue
;
p
Meter
=
meterInfo
+
sid
;
if
(
p
Meter
->
finalNumOfBlocks
<=
0
)
continue
;
p
Table
=
meterInfo
+
sid
;
if
(
p
Table
->
finalNumOfBlocks
<=
0
)
continue
;
if
(
p
Meter
->
committedPoints
>
0
)
{
vnodeUpdateCommitInfo
(
pObj
,
p
Meter
->
commitSlot
,
pMeter
->
commitPos
,
pMeter
->
commitCount
);
if
(
p
Table
->
committedPoints
>
0
)
{
vnodeUpdateCommitInfo
(
pObj
,
p
Table
->
commitSlot
,
pTable
->
commitPos
,
pTable
->
commitCount
);
}
}
...
...
src/vnode/detail/src/vnodeImport.c
浏览文件 @
31f8cf3f
...
...
@@ -1236,12 +1236,12 @@ _error_merge:
} \
}
int
isCacheEnd
(
SBlockIter
iter
,
SMeterObj
*
p
Meter
)
{
SCacheInfo
*
pInfo
=
(
SCacheInfo
*
)(
p
Meter
->
pCache
);
int
isCacheEnd
(
SBlockIter
iter
,
SMeterObj
*
p
Table
)
{
SCacheInfo
*
pInfo
=
(
SCacheInfo
*
)(
p
Table
->
pCache
);
int
slot
=
0
;
int
pos
=
0
;
if
(
pInfo
->
cacheBlocks
[
pInfo
->
currentSlot
]
->
numOfPoints
==
p
Meter
->
pointsPerBlock
)
{
if
(
pInfo
->
cacheBlocks
[
pInfo
->
currentSlot
]
->
numOfPoints
==
p
Table
->
pointsPerBlock
)
{
slot
=
(
pInfo
->
currentSlot
+
1
)
%
(
pInfo
->
maxBlocks
);
pos
=
0
;
}
else
{
...
...
src/vnode/detail/src/vnodeQueryImpl.c
浏览文件 @
31f8cf3f
...
...
@@ -4506,7 +4506,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
}
// get one queried meter
SMeterObj
*
p
Meter
=
getMeterObj
(
pSupporter
->
pMetersHashTable
,
pSupporter
->
pSidSet
->
pSids
[
0
]
->
sid
);
SMeterObj
*
p
Table
=
getMeterObj
(
pSupporter
->
pMetersHashTable
,
pSupporter
->
pSidSet
->
pSids
[
0
]
->
sid
);
pRuntimeEnv
->
pTSBuf
=
param
;
pRuntimeEnv
->
cur
.
vnodeIndex
=
-
1
;
...
...
@@ -4517,18 +4517,18 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
tsBufSetTraverseOrder
(
pRuntimeEnv
->
pTSBuf
,
order
);
}
int32_t
ret
=
setupQueryRuntimeEnv
(
p
Meter
,
pQuery
,
&
pSupporter
->
runtimeEnv
,
pTagSchema
,
TSQL_SO_ASC
,
true
);
int32_t
ret
=
setupQueryRuntimeEnv
(
p
Table
,
pQuery
,
&
pSupporter
->
runtimeEnv
,
pTagSchema
,
TSQL_SO_ASC
,
true
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
ret
=
allocateRuntimeEnvBuf
(
pRuntimeEnv
,
p
Meter
);
ret
=
allocateRuntimeEnvBuf
(
pRuntimeEnv
,
p
Table
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
tSidSetSort
(
pSupporter
->
pSidSet
);
vnodeRecordAllFiles
(
pQInfo
,
p
Meter
->
vnode
);
vnodeRecordAllFiles
(
pQInfo
,
p
Table
->
vnode
);
if
((
ret
=
allocateOutputBufForGroup
(
pSupporter
,
pQuery
,
true
))
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
...
...
@@ -4595,12 +4595,12 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
}
else
{
int32_t
num
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSupporter
->
numOfMeters
;
++
i
)
{
SMeterObj
*
p
Meter
=
getMeterObj
(
pSupporter
->
pMetersHashTable
,
pSupporter
->
pSidSet
->
pSids
[
i
]
->
sid
);
atomic_fetch_sub_32
(
&
(
p
Meter
->
numOfQueries
),
1
);
SMeterObj
*
p
Table
=
getMeterObj
(
pSupporter
->
pMetersHashTable
,
pSupporter
->
pSidSet
->
pSids
[
i
]
->
sid
);
atomic_fetch_sub_32
(
&
(
p
Table
->
numOfQueries
),
1
);
if
(
p
Meter
->
numOfQueries
>
0
)
{
dTrace
(
"QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d"
,
pQInfo
,
p
Meter
->
vnode
,
pMeter
->
sid
,
p
Meter
->
meterId
,
pMeter
->
numOfQueries
);
if
(
p
Table
->
numOfQueries
>
0
)
{
dTrace
(
"QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d"
,
pQInfo
,
p
Table
->
vnode
,
pTable
->
sid
,
p
Table
->
meterId
,
pTable
->
numOfQueries
);
num
++
;
}
}
...
...
src/vnode/detail/src/vnodeStream.c
浏览文件 @
31f8cf3f
...
...
@@ -186,15 +186,15 @@ void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
// Callback function called from client
void
vnodeCloseStreamCallback
(
void
*
param
)
{
SMeterObj
*
p
Meter
=
(
SMeterObj
*
)
param
;
SMeterObj
*
p
Table
=
(
SMeterObj
*
)
param
;
SVnodeObj
*
pVnode
=
NULL
;
if
(
p
Meter
==
NULL
||
pMeter
->
sqlLen
==
0
)
return
;
pVnode
=
vnodeList
+
p
Meter
->
vnode
;
if
(
p
Table
==
NULL
||
pTable
->
sqlLen
==
0
)
return
;
pVnode
=
vnodeList
+
p
Table
->
vnode
;
p
Meter
->
sqlLen
=
0
;
p
Meter
->
pSql
=
NULL
;
p
Meter
->
pStream
=
NULL
;
p
Table
->
sqlLen
=
0
;
p
Table
->
pSql
=
NULL
;
p
Table
->
pStream
=
NULL
;
pVnode
->
numOfStreams
--
;
...
...
@@ -203,5 +203,5 @@ void vnodeCloseStreamCallback(void *param) {
pVnode
->
dbConn
=
NULL
;
}
vnodeSaveMeterObjToFile
(
p
Meter
);
vnodeSaveMeterObjToFile
(
p
Table
);
}
\ No newline at end of file
src/vnode/detail/src/vnodeUtil.c
浏览文件 @
31f8cf3f
...
...
@@ -527,7 +527,7 @@ bool vnodeIsProjectionQuery(SSqlFunctionExpr* pExpr, int32_t numOfOutput) {
}
/*
* the p
Meter
->state may be changed by vnodeIsSafeToDeleteMeter and import/update processor, the check of
* the p
Table
->state may be changed by vnodeIsSafeToDeleteMeter and import/update processor, the check of
* the state will not always be correct.
*
* The import/update/deleting is actually blocked by current query processing if the check of meter state is
...
...
@@ -548,30 +548,30 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfSids
;
++
i
)
{
SMeterObj
*
p
Meter
=
pVnode
->
meterList
[
pSids
[
i
]
->
sid
];
SMeterObj
*
p
Table
=
pVnode
->
meterList
[
pSids
[
i
]
->
sid
];
/*
* If table is missing or is in dropping status, config it from management node, and ignore it
* during query processing. The error code of TSDB_CODE_NOT_ACTIVE_TABLE will never return to client.
* The missing table needs to be removed from pSids list
*/
if
(
p
Meter
==
NULL
||
vnodeIsMeterState
(
pMeter
,
TSDB_METER_STATE_DROPPING
))
{
if
(
p
Table
==
NULL
||
vnodeIsMeterState
(
pTable
,
TSDB_METER_STATE_DROPPING
))
{
dWarn
(
"qmsg:%p, vid:%d sid:%d, not there or will be dropped, ignore this table in query"
,
pQueryMsg
,
pQueryMsg
->
vnode
,
pSids
[
i
]
->
sid
);
vnodeSendMeterCfgMsg
(
pQueryMsg
->
vnode
,
pSids
[
i
]
->
sid
);
continue
;
}
else
if
(
p
Meter
->
uid
!=
pSids
[
i
]
->
uid
||
pMeter
->
sid
!=
pSids
[
i
]
->
sid
)
{
}
else
if
(
p
Table
->
uid
!=
pSids
[
i
]
->
uid
||
pTable
->
sid
!=
pSids
[
i
]
->
sid
)
{
code
=
TSDB_CODE_TABLE_ID_MISMATCH
;
dError
(
"qmsg:%p, vid:%d sid:%d id:%s uid:%"
PRIu64
", id mismatch. sid:%d uid:%"
PRId64
" in msg"
,
pQueryMsg
,
pQueryMsg
->
vnode
,
p
Meter
->
sid
,
pMeter
->
meterId
,
pMeter
->
uid
,
pSids
[
i
]
->
sid
,
pSids
[
i
]
->
uid
);
pQueryMsg
->
vnode
,
p
Table
->
sid
,
pTable
->
meterId
,
pTable
->
uid
,
pSids
[
i
]
->
sid
,
pSids
[
i
]
->
uid
);
vnodeSendMeterCfgMsg
(
pQueryMsg
->
vnode
,
pSids
[
i
]
->
sid
);
continue
;
}
else
if
(
p
Meter
->
state
>
TSDB_METER_STATE_INSERTING
)
{
//update or import
}
else
if
(
p
Table
->
state
>
TSDB_METER_STATE_INSERTING
)
{
//update or import
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
dTrace
(
"qmsg:%p, vid:%d sid:%d id:%s, it is in state:%s, wait!"
,
pQueryMsg
,
pQueryMsg
->
vnode
,
pSids
[
i
]
->
sid
,
p
Meter
->
meterId
,
taosGetTableStatusStr
(
pMeter
->
state
));
p
Table
->
meterId
,
taosGetTableStatusStr
(
pTable
->
state
));
continue
;
}
...
...
@@ -579,15 +579,15 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
* vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can
* check if the numOfQueries is 0 or not.
*/
pMeterObjList
[(
*
numOfIncTables
)
++
]
=
p
Meter
;
atomic_fetch_add_32
(
&
p
Meter
->
numOfQueries
,
1
);
pMeterObjList
[(
*
numOfIncTables
)
++
]
=
p
Table
;
atomic_fetch_add_32
(
&
p
Table
->
numOfQueries
,
1
);
pSids
[
index
++
]
=
pSids
[
i
];
// output for meter more than one query executed
if
(
p
Meter
->
numOfQueries
>
1
)
{
dTrace
(
"qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d"
,
pQueryMsg
,
p
Meter
->
vnode
,
pMeter
->
sid
,
p
Meter
->
meterId
,
pMeter
->
numOfQueries
);
if
(
p
Table
->
numOfQueries
>
1
)
{
dTrace
(
"qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d"
,
pQueryMsg
,
p
Table
->
vnode
,
pTable
->
sid
,
p
Table
->
meterId
,
pTable
->
numOfQueries
);
num
++
;
}
}
...
...
@@ -605,14 +605,14 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList,
int32_t
num
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfIncTables
;
++
i
)
{
SMeterObj
*
p
Meter
=
pMeterObjList
[
i
];
SMeterObj
*
p
Table
=
pMeterObjList
[
i
];
if
(
p
Meter
!=
NULL
)
{
// here, do not need to lock to perform operations
atomic_fetch_sub_32
(
&
p
Meter
->
numOfQueries
,
1
);
if
(
p
Table
!=
NULL
)
{
// here, do not need to lock to perform operations
atomic_fetch_sub_32
(
&
p
Table
->
numOfQueries
,
1
);
if
(
p
Meter
->
numOfQueries
>
0
)
{
dTrace
(
"qmsg:%p, vid:%d sid:%d id:%s dec query ref, numOfQueries:%d"
,
pQueryMsg
,
p
Meter
->
vnode
,
pMeter
->
sid
,
p
Meter
->
meterId
,
pMeter
->
numOfQueries
);
if
(
p
Table
->
numOfQueries
>
0
)
{
dTrace
(
"qmsg:%p, vid:%d sid:%d id:%s dec query ref, numOfQueries:%d"
,
pQueryMsg
,
p
Table
->
vnode
,
pTable
->
sid
,
p
Table
->
meterId
,
pTable
->
numOfQueries
);
num
++
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录