Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b2ad12cb
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
b2ad12cb
编写于
3月 01, 2022
作者:
D
dapan1121
提交者:
GitHub
3月 01, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10465 from taosdata/feature/qnode
Feature/qnode
上级
ec8d5f04
125091ad
变更
31
展开全部
隐藏空白更改
内联
并排
Showing
31 changed file
with
2076 addition
and
1027 deletion
+2076
-1027
include/common/tep.h
include/common/tep.h
+7
-3
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-1
include/libs/function/function.h
include/libs/function/function.h
+10
-5
include/libs/scalar/scalar.h
include/libs/scalar/scalar.h
+10
-2
include/util/taoserror.h
include/util/taoserror.h
+10
-3
include/util/tdef.h
include/util/tdef.h
+15
-0
source/common/src/tep.c
source/common/src/tep.c
+40
-27
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-0
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+1
-1
source/dnode/mnode/impl/inc/mndInfoSchema.h
source/dnode/mnode/impl/inc/mndInfoSchema.h
+45
-0
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+1
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+30
-25
source/dnode/mnode/impl/src/mndInfoSchema.c
source/dnode/mnode/impl/src/mndInfoSchema.c
+245
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+15
-7
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+13
-3
source/dnode/mnode/impl/test/stb/stb.cpp
source/dnode/mnode/impl/test/stb/stb.cpp
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+15
-5
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+91
-54
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+6
-2
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+6
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+36
-9
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+4
-4
source/libs/scalar/inc/filterInt.h
source/libs/scalar/inc/filterInt.h
+1
-0
source/libs/scalar/inc/sclInt.h
source/libs/scalar/inc/sclInt.h
+6
-3
source/libs/scalar/inc/sclvector.h
source/libs/scalar/inc/sclvector.h
+5
-1
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+20
-11
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+224
-98
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+655
-574
source/libs/scalar/test/filter/filterTests.cpp
source/libs/scalar/test/filter/filterTests.cpp
+177
-64
source/libs/scalar/test/scalar/scalarTests.cpp
source/libs/scalar/test/scalar/scalarTests.cpp
+378
-124
source/util/src/terror.c
source/util/src/terror.c
+4
-0
未找到文件。
include/common/tep.h
浏览文件 @
b2ad12cb
...
...
@@ -79,9 +79,12 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
}
}
#define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: (p1_)->pData + ((r_) * (p1_)->info.bytes))
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
#define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
int32_t
colDataAppend
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
);
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
numOfRow1
,
const
SColumnInfoData
*
pSource
,
...
...
@@ -112,6 +115,7 @@ SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols);
int32_t
blockDataSort
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
);
int32_t
blockDataSort_rv
(
SSDataBlock
*
pDataBlock
,
SArray
*
pOrderInfo
,
bool
nullFirst
);
int32_t
blockDataEnsureColumnCapacity
(
SColumnInfoData
*
pColumn
,
uint32_t
numOfRows
);
int32_t
blockDataEnsureCapacity
(
SSDataBlock
*
pDataBlock
,
uint32_t
numOfRows
);
void
blockDataClearup
(
SSDataBlock
*
pDataBlock
,
bool
hasVarCol
);
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
);
...
...
include/common/tmsgdef.h
浏览文件 @
b2ad12cb
...
...
@@ -127,7 +127,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_STB
,
"mnode-create-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_STB
,
"mnode-alter-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_STB
,
"mnode-drop-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_
STB_META
,
"mnode-stb
-meta"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_
TABLE_META
,
"mnode-table
-meta"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_VGROUP_LIST
,
"mnode-vgroup-list"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_KILL_QUERY
,
"mnode-kill-query"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_KILL_CONN
,
"mnode-kill-conn"
,
NULL
,
NULL
)
...
...
include/libs/function/function.h
浏览文件 @
b2ad12cb
...
...
@@ -229,11 +229,16 @@ typedef struct SAggFunctionInfo {
}
SAggFunctionInfo
;
typedef
struct
SScalarParam
{
void
*
data
;
bool
colData
;
int32_t
num
;
int32_t
type
;
int32_t
bytes
;
void
*
data
;
union
{
SColumnInfoData
*
columnData
;
void
*
data
;
}
orig
;
char
*
bitmap
;
bool
dataInBlock
;
int32_t
num
;
int32_t
type
;
int32_t
bytes
;
}
SScalarParam
;
typedef
struct
SScalarFunctionInfo
{
...
...
include/libs/scalar/scalar.h
浏览文件 @
b2ad12cb
...
...
@@ -25,9 +25,17 @@ extern "C" {
typedef
struct
SFilterInfo
SFilterInfo
;
/*
pNode will be freed in API;
*pRes need to freed in caller
*/
int32_t
scalarCalculateConstants
(
SNode
*
pNode
,
SNode
**
pRes
);
int32_t
scalarCalculate
(
SNode
*
pNode
,
SSDataBlock
*
pSrc
,
SScalarParam
*
pDst
);
/*
pDst need to freed in caller
*/
int32_t
scalarCalculate
(
SNode
*
pNode
,
SArray
*
pBlockList
,
SScalarParam
*
pDst
);
int32_t
scalarGetOperatorParamNum
(
EOperatorType
type
);
int32_t
scalarGenerateSetFromList
(
void
**
data
,
void
*
pNode
,
uint32_t
type
);
...
...
include/util/taoserror.h
浏览文件 @
b2ad12cb
...
...
@@ -13,13 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_ERROR_H_
#define _TD_UTIL_ERROR_H_
#ifndef _TD_UTIL_
TAOS_
ERROR_H_
#define _TD_UTIL_
TAOS_
ERROR_H_
#ifdef __cplusplus
extern
"C"
{
#endif
// clang-format off
#define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code))))
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
...
...
@@ -233,6 +235,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_COLUMN_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AD)
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
// mnode-infoSchema
#define TSDB_CODE_MND_INVALID_INFOS_TBL TAOS_DEF_ERROR_CODE(0, 0x03B0)
// mnode-func
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C0)
#define TSDB_CODE_MND_FUNC_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C1)
...
...
@@ -263,6 +268,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0)
// dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401)
...
...
@@ -467,4 +474,4 @@ int32_t* taosGetErrno();
}
#endif
#endif
/*_TD_UTIL_ERROR_H_*/
#endif
/*_TD_UTIL_
TAOS_
ERROR_H_*/
include/util/tdef.h
浏览文件 @
b2ad12cb
...
...
@@ -94,6 +94,21 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_TIME_PRECISION_MICRO_STR "us"
#define TSDB_TIME_PRECISION_NANO_STR "ns"
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_INS_TABLE_DNODES "dnodes"
#define TSDB_INS_TABLE_MNODES "mnodes"
#define TSDB_INS_TABLE_MODULES "modules"
#define TSDB_INS_TABLE_QNODES "qnodes"
#define TSDB_INS_TABLE_USER_DATABASE "user_database"
#define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions"
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
#define TSDB_INS_TABLE_USER_STREAMS "user_streams"
#define TSDB_INS_TABLE_USER_TABLES "user_tables"
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
#define TSDB_INS_TABLE_USER_USERS "user_users"
#define TSDB_INS_TABLE_VGROUPS "vgroups"
#define TSDB_TICK_PER_SECOND(precision) \
((int64_t)((precision) == TSDB_TIME_PRECISION_MILLI ? 1e3L \
: ((precision) == TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
...
...
source/common/src/tep.c
浏览文件 @
b2ad12cb
...
...
@@ -75,7 +75,6 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) {
return
ep
;
}
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
int32_t
colDataGetLength
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
)
{
ASSERT
(
pColumnInfoData
!=
NULL
);
...
...
@@ -135,6 +134,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
}
else
{
char
*
p
=
pColumnInfoData
->
pData
+
pColumnInfoData
->
info
.
bytes
*
currentRow
;
switch
(
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
{
*
(
bool
*
)
p
=
*
(
bool
*
)
pData
;
break
;}
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_UTINYINT
:
{
*
(
int8_t
*
)
p
=
*
(
int8_t
*
)
pData
;
break
;}
case
TSDB_DATA_TYPE_SMALLINT
:
...
...
@@ -144,6 +144,8 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
case
TSDB_DATA_TYPE_TIMESTAMP
:
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
{
*
(
int64_t
*
)
p
=
*
(
int64_t
*
)
pData
;
break
;}
case
TSDB_DATA_TYPE_FLOAT
:
{
*
(
float
*
)
p
=
*
(
float
*
)
pData
;
break
;}
case
TSDB_DATA_TYPE_DOUBLE
:
{
*
(
double
*
)
p
=
*
(
double
*
)
pData
;
break
;}
default:
assert
(
0
);
}
...
...
@@ -1071,36 +1073,47 @@ void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) {
}
}
int32_t
blockDataEnsureCapacity
(
SSDataBlock
*
pDataBlock
,
uint32_t
numOfRows
)
{
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
if
(
IS_VAR_DATA_TYPE
(
p
->
info
.
type
))
{
char
*
tmp
=
realloc
(
p
->
varmeta
.
offset
,
sizeof
(
int32_t
)
*
numOfRows
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
blockDataEnsureColumnCapacity
(
SColumnInfoData
*
pColumn
,
uint32_t
numOfRows
)
{
if
(
IS_VAR_DATA_TYPE
(
pColumn
->
info
.
type
))
{
char
*
tmp
=
realloc
(
pColumn
->
varmeta
.
offset
,
sizeof
(
int32_t
)
*
numOfRows
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
p
->
varmeta
.
offset
=
(
int32_t
*
)
tmp
;
memset
(
p
->
varmeta
.
offset
,
0
,
sizeof
(
int32_t
)
*
numOfRows
);
pColumn
->
varmeta
.
offset
=
(
int32_t
*
)
tmp
;
memset
(
pColumn
->
varmeta
.
offset
,
0
,
sizeof
(
int32_t
)
*
numOfRows
);
p
->
varmeta
.
length
=
0
;
p
->
varmeta
.
allocLen
=
0
;
tfree
(
p
->
pData
);
}
else
{
char
*
tmp
=
realloc
(
p
->
nullbitmap
,
BitmapLen
(
numOfRows
));
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pColumn
->
varmeta
.
length
=
0
;
pColumn
->
varmeta
.
allocLen
=
0
;
tfree
(
pColumn
->
pData
);
}
else
{
char
*
tmp
=
realloc
(
pColumn
->
nullbitmap
,
BitmapLen
(
numOfRows
));
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
p
->
nullbitmap
=
tmp
;
memset
(
p
->
nullbitmap
,
0
,
BitmapLen
(
numOfRows
));
pColumn
->
nullbitmap
=
tmp
;
memset
(
pColumn
->
nullbitmap
,
0
,
BitmapLen
(
numOfRows
));
tmp
=
realloc
(
p
->
pData
,
numOfRows
*
p
->
info
.
bytes
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
tmp
=
realloc
(
pColumn
->
pData
,
numOfRows
*
pColumn
->
info
.
bytes
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
p
->
pData
=
tmp
;
pColumn
->
pData
=
tmp
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
blockDataEnsureCapacity
(
SSDataBlock
*
pDataBlock
,
uint32_t
numOfRows
)
{
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
code
=
blockDataEnsureColumnCapacity
(
p
,
numOfRows
);
if
(
code
)
{
return
code
;
}
}
...
...
@@ -1149,4 +1162,4 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
)
{
return
pageSize
/
(
blockDataGetSerialRowSize
(
pBlock
)
+
blockDataGetSerialMetaSize
(
pBlock
));
}
\ No newline at end of file
}
source/common/src/tmsg.c
浏览文件 @
b2ad12cb
...
...
@@ -1524,6 +1524,10 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) {
if
(
tDecodeI32
(
pDecoder
,
&
pRsp
->
vgNum
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pRsp
->
hashMethod
)
<
0
)
return
-
1
;
if
(
pRsp
->
vgNum
<=
0
)
{
return
0
;
}
pRsp
->
pVgroupInfos
=
taosArrayInit
(
pRsp
->
vgNum
,
sizeof
(
SVgroupInfo
));
if
(
pRsp
->
pVgroupInfos
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
b2ad12cb
...
...
@@ -95,7 +95,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_STB
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_STB
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_STB
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_
STB
_META
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_
TABLE
_META
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_VGROUP_LIST
)]
=
dndProcessMnodeReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_QUERY
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_CONN
)]
=
dndProcessMnodeWriteMsg
;
...
...
source/dnode/mnode/impl/inc/mndInfoSchema.h
0 → 100644
浏览文件 @
b2ad12cb
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_INFO_SCHEMA_H_
#define _TD_MND_INFO_SCHEMA_H_
#include "mndInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SInfosTableSchema
{
char
*
name
;
int32_t
type
;
int32_t
bytes
;
}
SInfosTableSchema
;
typedef
struct
SInfosTableMeta
{
char
*
name
;
const
SInfosTableSchema
*
schema
;
int32_t
colNum
;
}
SInfosTableMeta
;
int32_t
mndBuildInsTableSchema
(
SMnode
*
pMnode
,
const
char
*
dbFName
,
const
char
*
tbName
,
STableMetaRsp
*
pRsp
);
int32_t
mndInitInfos
(
SMnode
*
pMnode
);
void
mndCleanupInfos
(
SMnode
*
pMnode
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_MND_INFO_SCHEMA_H_*/
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
b2ad12cb
...
...
@@ -90,6 +90,7 @@ typedef struct SMnode {
SProfileMgmt
profileMgmt
;
STelemMgmt
telemMgmt
;
SSyncMgmt
syncMgmt
;
SHashObj
*
infosMeta
;
MndMsgFp
msgFp
[
TDMT_MAX
];
SendReqToDnodeFp
sendReqToDnodeFp
;
SendReqToMnodeFp
sendReqToMnodeFp
;
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
b2ad12cb
...
...
@@ -937,36 +937,41 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
goto
USE_DB_OVER
;
}
pDb
=
mndAcquireDb
(
pMnode
,
usedbReq
.
db
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
USE_DB_OVER
;
}
char
*
p
=
strchr
(
usedbReq
.
db
,
'.'
);
if
(
p
&&
0
==
strcmp
(
p
+
1
,
TSDB_INFORMATION_SCHEMA_DB
))
{
memcpy
(
usedbRsp
.
db
,
usedbReq
.
db
,
TSDB_DB_FNAME_LEN
);
}
else
{
pDb
=
mndAcquireDb
(
pMnode
,
usedbReq
.
db
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
USE_DB_OVER
;
}
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pUser
==
NULL
)
{
goto
USE_DB_OVER
;
}
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pUser
==
NULL
)
{
goto
USE_DB_OVER
;
}
if
(
mndCheckUseDbAuth
(
pUser
,
pDb
)
!=
0
)
{
goto
USE_DB_OVER
;
}
if
(
mndCheckUseDbAuth
(
pUser
,
pDb
)
!=
0
)
{
goto
USE_DB_OVER
;
}
usedbRsp
.
pVgroupInfos
=
taosArrayInit
(
pDb
->
cfg
.
numOfVgroups
,
sizeof
(
SVgroupInfo
));
if
(
usedbRsp
.
pVgroupInfos
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
USE_DB_OVER
;
}
usedbRsp
.
pVgroupInfos
=
taosArrayInit
(
pDb
->
cfg
.
numOfVgroups
,
sizeof
(
SVgroupInfo
));
if
(
usedbRsp
.
pVgroupInfos
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
USE_DB_OVER
;
}
if
(
usedbReq
.
vgVersion
<
pDb
->
vgVersion
)
{
mndBuildDBVgroupInfo
(
pDb
,
pMnode
,
usedbRsp
.
pVgroupInfos
);
}
if
(
usedbReq
.
vgVersion
<
pDb
->
vgVersion
)
{
mndBuildDBVgroupInfo
(
pDb
,
pMnode
,
usedbRsp
.
pVgroupInfos
);
}
memcpy
(
usedbRsp
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
usedbRsp
.
uid
=
pDb
->
uid
;
usedbRsp
.
vgVersion
=
pDb
->
vgVersion
;
usedbRsp
.
vgNum
=
taosArrayGetSize
(
usedbRsp
.
pVgroupInfos
);
usedbRsp
.
hashMethod
=
pDb
->
hashMethod
;
memcpy
(
usedbRsp
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
usedbRsp
.
uid
=
pDb
->
uid
;
usedbRsp
.
vgVersion
=
pDb
->
vgVersion
;
usedbRsp
.
vgNum
=
taosArrayGetSize
(
usedbRsp
.
pVgroupInfos
);
usedbRsp
.
hashMethod
=
pDb
->
hashMethod
;
}
int32_t
contLen
=
tSerializeSUseDbRsp
(
NULL
,
0
,
&
usedbRsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
...
...
source/dnode/mnode/impl/src/mndInfoSchema.c
0 → 100644
浏览文件 @
b2ad12cb
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndInfoSchema.h"
static
const
SInfosTableSchema
dnodesSchema
[]
=
{{.
name
=
"id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"end_point"
,
.
bytes
=
134
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"vnodes"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"cores"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"status"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"role"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"offline_reason"
,
.
bytes
=
256
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
};
static
const
SInfosTableSchema
mnodesSchema
[]
=
{{.
name
=
"id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"end_point"
,
.
bytes
=
134
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"role"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"role_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
};
static
const
SInfosTableSchema
modulesSchema
[]
=
{{.
name
=
"id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"end_point"
,
.
bytes
=
134
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"module"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
};
static
const
SInfosTableSchema
qnodesSchema
[]
=
{{.
name
=
"id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"end_point"
,
.
bytes
=
134
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
};
static
const
SInfosTableSchema
userDBSchema
[]
=
{{.
name
=
"name"
,
.
bytes
=
32
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"created_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"ntables"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"vgroups"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"replica"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"quorum"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"days"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"keep"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"cache"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"blocks"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"minrows"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"maxrows"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"wallevel"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"fsync"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"comp"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"cachelast"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"precision"
,
.
bytes
=
2
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"status"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
};
static
const
SInfosTableSchema
userFuncSchema
[]
=
{{.
name
=
"name"
,
.
bytes
=
32
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"created_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"ntables"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"precision"
,
.
bytes
=
2
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"status"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
};
static
const
SInfosTableSchema
userIdxSchema
[]
=
{{.
name
=
"table_database"
,
.
bytes
=
32
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"table_name"
,
.
bytes
=
192
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"index_database"
,
.
bytes
=
32
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"index_name"
,
.
bytes
=
192
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"column_name"
,
.
bytes
=
64
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"index_type"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"index_extensions"
,
.
bytes
=
256
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
};
static
const
SInfosTableSchema
userStbsSchema
[]
=
{{.
name
=
"db_name"
,
.
bytes
=
32
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"stable_name"
,
.
bytes
=
192
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"created_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"columns"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"tags"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"tables"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
};
static
const
SInfosTableSchema
userStreamsSchema
[]
=
{{.
name
=
"stream_name"
,
.
bytes
=
192
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"user_name"
,
.
bytes
=
23
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"dest_table"
,
.
bytes
=
192
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"created_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"sql"
,
.
bytes
=
1024
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
};
static
const
SInfosTableSchema
userTblsSchema
[]
=
{{.
name
=
"db_name"
,
.
bytes
=
32
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"table_name"
,
.
bytes
=
192
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"created_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"columns"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"stable_name"
,
.
bytes
=
192
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"tid"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"vg_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
};
static
const
SInfosTableSchema
userTblDistSchema
[]
=
{{.
name
=
"db_name"
,
.
bytes
=
32
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"table_name"
,
.
bytes
=
192
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"distributed_histogram"
,
.
bytes
=
500
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"min_of_rows"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"max_of_rows"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"avg_of_rows"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"stddev_of_rows"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"rows"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"blocks"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"storage_size"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"compression_ratio"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_DOUBLE
},
{.
name
=
"rows_in_mem"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"seek_header_time"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
};
static
const
SInfosTableSchema
userUsersSchema
[]
=
{{.
name
=
"user_name"
,
.
bytes
=
23
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"privilege"
,
.
bytes
=
256
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
};
static
const
SInfosTableSchema
vgroupsSchema
[]
=
{{.
name
=
"vg_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"tables"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"status"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"onlines"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"v1_dnode"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"v1_status"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"v2_dnode"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"v2_status"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"v3_dnode"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"v3_status"
,
.
bytes
=
10
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"compacting"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
};
static
const
SInfosTableMeta
infosMeta
[]
=
{{
TSDB_INS_TABLE_DNODES
,
dnodesSchema
,
tListLen
(
dnodesSchema
)},
{
TSDB_INS_TABLE_MNODES
,
mnodesSchema
,
tListLen
(
mnodesSchema
)},
{
TSDB_INS_TABLE_MODULES
,
modulesSchema
,
tListLen
(
modulesSchema
)},
{
TSDB_INS_TABLE_QNODES
,
qnodesSchema
,
tListLen
(
qnodesSchema
)},
{
TSDB_INS_TABLE_USER_DATABASE
,
userDBSchema
,
tListLen
(
userDBSchema
)},
{
TSDB_INS_TABLE_USER_FUNCTIONS
,
userFuncSchema
,
tListLen
(
userFuncSchema
)},
{
TSDB_INS_TABLE_USER_INDEXES
,
userIdxSchema
,
tListLen
(
userIdxSchema
)},
{
TSDB_INS_TABLE_USER_STABLES
,
userStbsSchema
,
tListLen
(
userStbsSchema
)},
{
TSDB_INS_TABLE_USER_STREAMS
,
userStreamsSchema
,
tListLen
(
userStreamsSchema
)},
{
TSDB_INS_TABLE_USER_TABLES
,
userTblsSchema
,
tListLen
(
userTblsSchema
)},
{
TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED
,
userTblDistSchema
,
tListLen
(
userTblDistSchema
)},
{
TSDB_INS_TABLE_USER_USERS
,
userUsersSchema
,
tListLen
(
userUsersSchema
)},
{
TSDB_INS_TABLE_VGROUPS
,
vgroupsSchema
,
tListLen
(
vgroupsSchema
)},
};
int32_t
mndInitInfosTableSchema
(
const
SInfosTableSchema
*
pSrc
,
int32_t
colNum
,
SSchema
**
pDst
)
{
SSchema
*
schema
=
calloc
(
colNum
,
sizeof
(
SSchema
));
if
(
NULL
==
schema
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
colNum
;
++
i
)
{
strcpy
(
schema
[
i
].
name
,
pSrc
[
i
].
name
);
schema
[
i
].
type
=
pSrc
[
i
].
type
;
schema
[
i
].
colId
=
i
+
1
;
schema
[
i
].
bytes
=
pSrc
[
i
].
bytes
;
}
*
pDst
=
schema
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
mndInsInitMeta
(
SHashObj
*
hash
)
{
STableMetaRsp
meta
=
{
0
};
strcpy
(
meta
.
dbFName
,
TSDB_INFORMATION_SCHEMA_DB
);
meta
.
tableType
=
TSDB_NORMAL_TABLE
;
meta
.
sversion
=
1
;
meta
.
tversion
=
1
;
for
(
int32_t
i
=
0
;
i
<
tListLen
(
infosMeta
);
++
i
)
{
strcpy
(
meta
.
tbName
,
infosMeta
[
i
].
name
);
meta
.
numOfColumns
=
infosMeta
[
i
].
colNum
;
if
(
mndInitInfosTableSchema
(
infosMeta
[
i
].
schema
,
infosMeta
[
i
].
colNum
,
&
meta
.
pSchemas
))
{
return
-
1
;
}
if
(
taosHashPut
(
hash
,
meta
.
tbName
,
strlen
(
meta
.
tbName
),
&
meta
,
sizeof
(
meta
)))
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
mndBuildInsTableSchema
(
SMnode
*
pMnode
,
const
char
*
dbFName
,
const
char
*
tbName
,
STableMetaRsp
*
pRsp
)
{
if
(
NULL
==
pMnode
->
infosMeta
)
{
terrno
=
TSDB_CODE_MND_NOT_READY
;
return
-
1
;
}
STableMetaRsp
*
meta
=
(
STableMetaRsp
*
)
taosHashGet
(
pMnode
->
infosMeta
,
tbName
,
strlen
(
tbName
));
if
(
NULL
==
meta
)
{
mError
(
"invalid information schema table name:%s"
,
tbName
);
terrno
=
TSDB_CODE_MND_INVALID_INFOS_TBL
;
return
-
1
;
}
*
pRsp
=
*
meta
;
pRsp
->
pSchemas
=
calloc
(
meta
->
numOfColumns
,
sizeof
(
SSchema
));
if
(
pRsp
->
pSchemas
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
pRsp
->
pSchemas
=
NULL
;
return
-
1
;
}
memcpy
(
pRsp
->
pSchemas
,
meta
->
pSchemas
,
meta
->
numOfColumns
*
sizeof
(
SSchema
));
return
0
;
}
int32_t
mndInitInfos
(
SMnode
*
pMnode
)
{
pMnode
->
infosMeta
=
taosHashInit
(
20
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
pMnode
->
infosMeta
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
return
mndInsInitMeta
(
pMnode
->
infosMeta
);
}
void
mndCleanupInfos
(
SMnode
*
pMnode
)
{
if
(
NULL
==
pMnode
->
infosMeta
)
{
return
;
}
void
*
pIter
=
taosHashIterate
(
pMnode
->
infosMeta
,
NULL
);
while
(
pIter
)
{
STableMetaRsp
*
meta
=
(
STableMetaRsp
*
)
pIter
;
tfree
(
meta
->
pSchemas
);
pIter
=
taosHashIterate
(
pMnode
->
infosMeta
,
pIter
);
}
taosHashCleanup
(
pMnode
->
infosMeta
);
pMnode
->
infosMeta
=
NULL
;
}
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
b2ad12cb
...
...
@@ -23,6 +23,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndInfoSchema.h"
#include "tname.h"
#define TSDB_STB_VER_NUMBER 1
...
...
@@ -38,7 +39,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq);
static
int32_t
mndProcessVCreateStbRsp
(
SMnodeMsg
*
pRsp
);
static
int32_t
mndProcessVAlterStbRsp
(
SMnodeMsg
*
pRsp
);
static
int32_t
mndProcessVDropStbRsp
(
SMnodeMsg
*
pRsp
);
static
int32_t
mndProcess
Stb
MetaReq
(
SMnodeMsg
*
pReq
);
static
int32_t
mndProcess
Table
MetaReq
(
SMnodeMsg
*
pReq
);
static
int32_t
mndGetStbMeta
(
SMnodeMsg
*
pReq
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveStb
(
SMnodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextStb
(
SMnode
*
pMnode
,
void
*
pIter
);
...
...
@@ -58,7 +59,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_STB_RSP
,
mndProcessVCreateStbRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_STB_RSP
,
mndProcessVAlterStbRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_STB_RSP
,
mndProcessVDropStbRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
STB_META
,
mndProcessStb
MetaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_
TABLE_META
,
mndProcessTable
MetaReq
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_STB
,
mndGetStbMeta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_STB
,
mndRetrieveStb
);
...
...
@@ -1310,7 +1311,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
return
code
;
}
static
int32_t
mndProcess
Stb
MetaReq
(
SMnodeMsg
*
pReq
)
{
static
int32_t
mndProcess
Table
MetaReq
(
SMnodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pMnode
;
int32_t
code
=
-
1
;
STableInfoReq
infoReq
=
{
0
};
...
...
@@ -1321,9 +1322,16 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
goto
RETRIEVE_META_OVER
;
}
mDebug
(
"stb:%s.%s, start to retrieve meta"
,
infoReq
.
dbFName
,
infoReq
.
tbName
);
if
(
mndBuildStbSchema
(
pMnode
,
infoReq
.
dbFName
,
infoReq
.
tbName
,
&
metaRsp
)
!=
0
)
{
goto
RETRIEVE_META_OVER
;
if
(
0
==
strcmp
(
infoReq
.
dbFName
,
TSDB_INFORMATION_SCHEMA_DB
))
{
mDebug
(
"information_schema table:%s.%s, start to retrieve meta"
,
infoReq
.
dbFName
,
infoReq
.
tbName
);
if
(
mndBuildInsTableSchema
(
pMnode
,
infoReq
.
dbFName
,
infoReq
.
tbName
,
&
metaRsp
)
!=
0
)
{
goto
RETRIEVE_META_OVER
;
}
}
else
{
mDebug
(
"stb:%s.%s, start to retrieve meta"
,
infoReq
.
dbFName
,
infoReq
.
tbName
);
if
(
mndBuildStbSchema
(
pMnode
,
infoReq
.
dbFName
,
infoReq
.
tbName
,
&
metaRsp
)
!=
0
)
{
goto
RETRIEVE_META_OVER
;
}
}
int32_t
rspLen
=
tSerializeSTableMetaRsp
(
NULL
,
0
,
&
metaRsp
);
...
...
@@ -1553,4 +1561,4 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
static
void
mndCancelGetNextStb
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
\ No newline at end of file
}
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
b2ad12cb
...
...
@@ -36,6 +36,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndInfoSchema.h"
#define MQ_TIMER_MS 3000
#define TRNAS_TIMER_MS 6000
...
...
@@ -221,6 +222,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if
(
mndAllocStep
(
pMnode
,
"mnode-offset"
,
mndInitOffset
,
mndCleanupOffset
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-vgroup"
,
mndInitVgroup
,
mndCleanupVgroup
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-stb"
,
mndInitStb
,
mndCleanupStb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-infos"
,
mndInitInfos
,
mndCleanupInfos
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-db"
,
mndInitDb
,
mndCleanupDb
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-func"
,
mndInitFunc
,
mndCleanupFunc
)
!=
0
)
return
-
1
;
if
(
pMnode
->
clusterId
<=
0
)
{
...
...
@@ -503,9 +505,17 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
}
}
// Note: uid 0 is reserved
uint64_t
mndGenerateUid
(
char
*
name
,
int32_t
len
)
{
int64_t
us
=
taosGetTimestampUs
();
int32_t
hashval
=
MurmurHash3_32
(
name
,
len
);
uint64_t
x
=
(
us
&
0x000000FFFFFFFFFF
)
<<
24
;
return
x
+
((
hashval
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
do
{
int64_t
us
=
taosGetTimestampUs
();
uint64_t
x
=
(
us
&
0x000000FFFFFFFFFF
)
<<
24
;
uint64_t
uuid
=
x
+
((
hashval
&
((
1ul
<<
16
)
-
1ul
))
<<
8
)
+
(
taosRand
()
&
((
1ul
<<
8
)
-
1ul
));
if
(
uuid
)
{
return
uuid
;
}
}
while
(
true
);
}
source/dnode/mnode/impl/test/stb/stb.cpp
浏览文件 @
b2ad12cb
...
...
@@ -339,7 +339,7 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSTableInfoReq
(
pReq
,
contLen
,
&
infoReq
);
SRpcMsg
*
pMsg
=
test
.
SendReq
(
TDMT_MND_
STB
_META
,
pReq
,
contLen
);
SRpcMsg
*
pMsg
=
test
.
SendReq
(
TDMT_MND_
TABLE
_META
,
pReq
,
contLen
);
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
b2ad12cb
...
...
@@ -189,11 +189,21 @@ typedef struct SCtgAction {
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE)
#define CTG_IS_STABLE(isSTable) (1 == (isSTable))
#define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable))
#define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0)
#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0)
#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_FLAG_STB 0x1
#define CTG_FLAG_NOT_STB 0x2
#define CTG_FLAG_UNKNOWN_STB 0x4
#define CTG_FLAG_INF_DB 0x8
#define CTG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB)
#define CTG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB)
#define CTG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB)
#define CTG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB)
#define CTG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB)
#define CTG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0)
#define CTG_GEN_STB_FLAG(_isStb) ((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)
#define CTG_TBTYPE_MATCH(_flag, tbType) (CTG_IS_UNKNOWN_STB(_flag) || (CTG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_IS_INF_DBNAME(_dbname) ((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB)))
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
b2ad12cb
...
...
@@ -509,7 +509,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName,
}
int32_t
ctgGetTableMetaFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
*
exist
)
{
int32_t
ctgGetTableMetaFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
*
exist
,
int32_t
flag
)
{
if
(
NULL
==
pCtg
->
dbCache
)
{
*
exist
=
0
;
ctgWarn
(
"empty tbmeta cache, tbName:%s"
,
pTableName
->
tname
);
...
...
@@ -517,7 +517,11 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
}
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
pTableName
,
dbFName
);
if
(
CTG_IS_INF_DB
(
flag
))
{
strcpy
(
dbFName
,
pTableName
->
dbname
);
}
else
{
tNameGetFullDbName
(
pTableName
,
dbFName
);
}
*
pTableMeta
=
NULL
;
...
...
@@ -590,15 +594,19 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTableTypeFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
int32_t
*
tbType
)
{
int32_t
ctgGetTableTypeFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
int32_t
*
tbType
,
int32_t
flag
)
{
if
(
NULL
==
pCtg
->
dbCache
)
{
ctgWarn
(
"empty db cache, tbName:%s"
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
pTableName
,
dbFName
);
if
(
CTG_IS_INF_DB
(
flag
))
{
strcpy
(
dbFName
,
pTableName
->
dbname
);
}
else
{
tNameGetFullDbName
(
pTableName
,
dbFName
);
}
SCtgDBCache
*
dbCache
=
NULL
;
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
...
...
@@ -629,7 +637,7 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTableMetaFromMnodeImpl
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
char
*
dbFName
,
char
*
tbName
,
STableMetaOutput
*
output
)
{
int32_t
ctgGetTableMetaFromMnodeImpl
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
char
*
dbFName
,
char
*
tbName
,
STableMetaOutput
*
output
)
{
SBuildTableMetaInput
bInput
=
{.
vgId
=
0
,
.
dbFName
=
dbFName
,
.
tbName
=
tbName
};
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
...
...
@@ -637,21 +645,21 @@ int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTransporter, const S
ctgDebug
(
"try to get table meta from mnode, dbFName:%s, tbName:%s"
,
dbFName
,
tbName
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_
STB
_META
)](
&
bInput
,
&
msg
,
0
,
&
msgLen
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_
TABLE
_META
)](
&
bInput
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
ctgError
(
"Build mnode stablemeta msg failed, code:%x"
,
code
);
CTG_ERR_RET
(
code
);
}
SRpcMsg
rpcMsg
=
{
.
msgType
=
TDMT_MND_
STB
_META
,
.
msgType
=
TDMT_MND_
TABLE
_META
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pTrans
porter
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
rpcSendRecv
(
pTrans
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
if
(
CTG_TABLE_NOT_EXIST
(
rpcRsp
.
code
))
{
...
...
@@ -664,7 +672,7 @@ int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTransporter, const S
CTG_ERR_RET
(
rpcRsp
.
code
);
}
code
=
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_
STB
_META
)](
output
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
code
=
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_
TABLE
_META
)](
output
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
ctgError
(
"Process mnode stablemeta rsp failed, code:%x, dbFName:%s, tbName:%s"
,
code
,
dbFName
,
tbName
);
CTG_ERR_RET
(
code
);
...
...
@@ -675,15 +683,15 @@ int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTransporter, const S
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTableMetaFromMnode
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMetaOutput
*
output
)
{
int32_t
ctgGetTableMetaFromMnode
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMetaOutput
*
output
)
{
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
pTableName
,
dbFName
);
return
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
dbFName
,
(
char
*
)
pTableName
->
tname
,
output
);
return
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
,
pMgmtEps
,
dbFName
,
(
char
*
)
pTableName
->
tname
,
output
);
}
int32_t
ctgGetTableMetaFromVnode
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
SVgroupInfo
*
vgroupInfo
,
STableMetaOutput
*
output
)
{
if
(
NULL
==
pCtg
||
NULL
==
pTrans
porter
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
vgroupInfo
||
NULL
==
output
)
{
int32_t
ctgGetTableMetaFromVnode
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
SVgroupInfo
*
vgroupInfo
,
STableMetaOutput
*
output
)
{
if
(
NULL
==
pCtg
||
NULL
==
pTrans
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
vgroupInfo
||
NULL
==
output
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
@@ -709,7 +717,7 @@ int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTransporter, const SEpSe
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pTrans
porter
,
&
vgroupInfo
->
epset
,
&
rpcMsg
,
&
rpcRsp
);
rpcSendRecv
(
pTrans
,
&
vgroupInfo
->
epset
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
if
(
CTG_TABLE_NOT_EXIST
(
rpcRsp
.
code
))
{
...
...
@@ -829,7 +837,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
CTG_RET
(
code
);
}
int32_t
ctgS
Table
VersionCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
int32_t
ctgS
tb
VersionCompare
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
*
(
uint64_t
*
)
key1
<
((
SSTableMetaVersion
*
)
key2
)
->
suid
)
{
return
-
1
;
}
else
if
(
*
(
uint64_t
*
)
key1
>
((
SSTableMetaVersion
*
)
key2
)
->
suid
)
{
...
...
@@ -1078,6 +1086,10 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
ctgDebug
(
"db added to cache, dbFName:%s, dbId:%"
PRIx64
,
dbFName
,
dbId
);
if
(
CTG_IS_INF_DBNAME
(
dbFName
))
{
return
TSDB_CODE_SUCCESS
;
}
CTG_ERR_RET
(
ctgMetaRentAdd
(
&
pCtg
->
dbRent
,
&
vgVersion
,
dbId
,
sizeof
(
SDbVgVersion
)));
ctgDebug
(
"db added to rent, dbFName:%s, vgVersion:%d, dbId:%"
PRIx64
,
dbFName
,
vgVersion
.
vgVersion
,
dbId
);
...
...
@@ -1100,7 +1112,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
uint64_t
*
suid
=
NULL
;
taosHashGetKey
(
pIter
,
(
void
**
)
&
suid
,
NULL
);
if
(
TSDB_CODE_SUCCESS
==
ctgMetaRentRemove
(
&
pCtg
->
stbRent
,
*
suid
,
ctgS
Table
VersionCompare
))
{
if
(
TSDB_CODE_SUCCESS
==
ctgMetaRentRemove
(
&
pCtg
->
stbRent
,
*
suid
,
ctgS
tb
VersionCompare
))
{
ctgDebug
(
"stb removed from rent, suid:%"
PRIx64
,
*
suid
);
}
...
...
@@ -1258,7 +1270,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
ctgDebug
(
"stb removed from stbCache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
dbFName
,
tbName
,
orig
->
suid
);
ctgMetaRentRemove
(
&
pCtg
->
stbRent
,
orig
->
suid
,
ctgS
Table
VersionCompare
);
ctgMetaRentRemove
(
&
pCtg
->
stbRent
,
orig
->
suid
,
ctgS
tb
VersionCompare
);
}
}
...
...
@@ -1428,15 +1440,17 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
int32_t
ctgRefreshTblMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
,
STableMetaOutput
**
pOutput
)
{
if
(
NULL
==
pCtg
||
NULL
==
pTrans
porter
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
int32_t
ctgRefreshTblMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
flag
,
STableMetaOutput
**
pOutput
)
{
if
(
NULL
==
pCtg
||
NULL
==
pTrans
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
SVgroupInfo
vgroupInfo
=
{
0
};
int32_t
code
=
0
;
CTG_ERR_RET
(
catalogGetTableHashVgroup
(
pCtg
,
pTransporter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
));
if
(
!
CTG_IS_INF_DB
(
flag
))
{
CTG_ERR_RET
(
catalogGetTableHashVgroup
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
));
}
SCtgUpdateTblMsg
*
msg
=
NULL
;
STableMetaOutput
moutput
=
{
0
};
...
...
@@ -1445,33 +1459,37 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
STableMetaOutput
));
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
CTG_IS_STABLE
(
isSTable
))
{
if
(
CTG_IS_INF_DB
(
flag
))
{
ctgDebug
(
"will refresh tbmeta, supposed in information_schema, tbName:%s"
,
tNameGetTableName
(
pTableName
));
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
,
pMgmtEps
,
(
char
*
)
pTableName
->
dbname
,
(
char
*
)
pTableName
->
tname
,
output
));
}
else
if
(
CTG_IS_STB
(
flag
))
{
ctgDebug
(
"will refresh tbmeta, supposed to be stb, tbName:%s"
,
tNameGetTableName
(
pTableName
));
// if get from mnode failed, will not try vnode
CTG_ERR_JRET
(
ctgGetTableMetaFromMnode
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
pTableName
,
output
));
CTG_ERR_JRET
(
ctgGetTableMetaFromMnode
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
output
));
if
(
CTG_IS_META_NULL
(
output
->
metaType
))
{
CTG_ERR_JRET
(
ctgGetTableMetaFromVnode
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
output
));
CTG_ERR_JRET
(
ctgGetTableMetaFromVnode
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
output
));
}
}
else
{
ctgDebug
(
"will refresh tbmeta, not supposed to be stb, tbName:%s,
isStable:%d"
,
tNameGetTableName
(
pTableName
),
isSTable
);
ctgDebug
(
"will refresh tbmeta, not supposed to be stb, tbName:%s,
flag:%d"
,
tNameGetTableName
(
pTableName
),
flag
);
// if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET
(
ctgGetTableMetaFromVnode
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
output
));
CTG_ERR_JRET
(
ctgGetTableMetaFromVnode
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
&
vgroupInfo
,
output
));
if
(
CTG_IS_META_TABLE
(
output
->
metaType
)
&&
TSDB_SUPER_TABLE
==
output
->
tbMeta
->
tableType
)
{
ctgDebug
(
"will continue to refresh tbmeta since got stb, tbName:%s, metaType:%d"
,
tNameGetTableName
(
pTableName
),
output
->
metaType
);
tfree
(
output
->
tbMeta
);
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
output
->
dbFName
,
output
->
tbName
,
output
));
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
,
pMgmtEps
,
output
->
dbFName
,
output
->
tbName
,
output
));
}
else
if
(
CTG_IS_META_BOTH
(
output
->
metaType
))
{
int32_t
exist
=
0
;
CTG_ERR_JRET
(
ctgIsTableMetaExistInCache
(
pCtg
,
output
->
dbFName
,
output
->
tbName
,
&
exist
));
if
(
0
==
exist
)
{
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
output
->
dbFName
,
output
->
tbName
,
&
moutput
));
CTG_ERR_JRET
(
ctgGetTableMetaFromMnodeImpl
(
pCtg
,
pTrans
,
pMgmtEps
,
output
->
dbFName
,
output
->
tbName
,
&
moutput
));
if
(
CTG_IS_META_NULL
(
moutput
.
metaType
))
{
SET_META_TYPE_NULL
(
output
->
metaType
);
...
...
@@ -1530,7 +1548,7 @@ _return:
CTG_RET
(
code
);
}
int32_t
ctgGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
bool
forceUpdate
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
)
{
int32_t
ctgGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
bool
forceUpdate
,
STableMeta
**
pTableMeta
,
int32_t
flag
)
{
if
(
NULL
==
pCtg
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
pTableMeta
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
@@ -1538,26 +1556,30 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t
exist
=
0
;
int32_t
code
=
0
;
if
(
!
forceUpdate
)
{
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
pTableMeta
,
&
exist
));
if
(
CTG_IS_INF_DBNAME
(
pTableName
->
dbname
))
{
CTG_SET_INF_DB
(
flag
);
}
if
(
exist
&&
CTG_TBTYPE_MATCH
(
isSTable
,
(
*
pTableMeta
)
->
tableType
))
{
if
((
!
forceUpdate
)
||
(
CTG_IS_INF_DB
(
flag
)))
{
CTG_ERR_RET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
pTableMeta
,
&
exist
,
flag
));
if
(
exist
&&
CTG_TBTYPE_MATCH
(
flag
,
(
*
pTableMeta
)
->
tableType
))
{
return
TSDB_CODE_SUCCESS
;
}
tfree
(
*
pTableMeta
);
}
else
if
(
CTG_IS_UNKNOWN_ST
ABLE
(
isSTable
))
{
}
else
if
(
CTG_IS_UNKNOWN_ST
B
(
flag
))
{
int32_t
tbType
=
0
;
CTG_ERR_RET
(
ctgGetTableTypeFromCache
(
pCtg
,
pTableName
,
&
tbType
));
CTG_ERR_RET
(
ctgGetTableTypeFromCache
(
pCtg
,
pTableName
,
&
tbType
,
flag
));
CTG_SET_ST
ABLE
(
isSTable
,
tbType
);
CTG_SET_ST
B
(
flag
,
tbType
);
}
STableMetaOutput
*
output
=
NULL
;
while
(
true
)
{
CTG_ERR_JRET
(
ctgRefreshTblMeta
(
pCtg
,
pRpc
,
pMgmtEps
,
pTableName
,
isSTable
,
&
output
));
CTG_ERR_JRET
(
ctgRefreshTblMeta
(
pCtg
,
pRpc
,
pMgmtEps
,
pTableName
,
flag
,
&
output
));
if
(
CTG_IS_META_TABLE
(
output
->
metaType
))
{
*
pTableMeta
=
output
->
tbMeta
;
...
...
@@ -1582,7 +1604,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
SName
stbName
=
*
pTableName
;
strcpy
(
stbName
.
tname
,
output
->
tbName
);
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
&
stbName
,
pTableMeta
,
&
exist
));
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
&
stbName
,
pTableMeta
,
&
exist
,
flag
));
if
(
0
==
exist
)
{
ctgDebug
(
"stb no longer exist, dbFName:%s, tbName:%s"
,
output
->
dbFName
,
pTableName
->
tname
);
continue
;
...
...
@@ -1663,6 +1685,11 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
char
*
p
=
strchr
(
output
->
dbFName
,
'.'
);
if
(
p
&&
CTG_IS_INF_DBNAME
(
p
+
1
))
{
memmove
(
output
->
dbFName
,
p
+
1
,
strlen
(
p
+
1
));
}
CTG_ERR_JRET
(
ctgGetAddDBCache
(
pCtg
,
output
->
dbFName
,
output
->
dbId
,
&
dbCache
));
if
(
NULL
==
dbCache
)
{
ctgInfo
(
"conflict db update, ignore this update, dbFName:%s, dbId:%"
PRIx64
,
output
->
dbFName
,
output
->
dbId
);
...
...
@@ -1729,7 +1756,7 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) {
ctgInfo
(
"stb removed from cache, dbFName:%s, stbName:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
CTG_ERR_JRET
(
ctgMetaRentRemove
(
&
msg
->
pCtg
->
stbRent
,
msg
->
suid
,
ctgS
Table
VersionCompare
));
CTG_ERR_JRET
(
ctgMetaRentRemove
(
&
msg
->
pCtg
->
stbRent
,
msg
->
suid
,
ctgS
tb
VersionCompare
));
ctgDebug
(
"stb removed from rent, dbFName:%s, stbName:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
...
...
@@ -2140,16 +2167,16 @@ _return:
}
int32_t
catalogGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
int32_t
catalogGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
-
1
));
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
CTG_FLAG_UNKNOWN_STB
));
}
int32_t
catalogGetSTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
int32_t
catalogGetSTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
1
));
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
false
,
pTableMeta
,
CTG_FLAG_STB
));
}
int32_t
catalogUpdateSTableMeta
(
SCatalog
*
pCtg
,
STableMetaRsp
*
rspMsg
)
{
...
...
@@ -2204,20 +2231,20 @@ _return:
}
int32_t
catalogRefreshTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
)
{
int32_t
catalogRefreshTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pTrans
porter
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
if
(
NULL
==
pCtg
||
NULL
==
pTrans
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_API_LEAVE
(
ctgRefreshTblMeta
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
pTableName
,
isSTable
,
NULL
));
CTG_API_LEAVE
(
ctgRefreshTblMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
CTG_GEN_STB_FLAG
(
isSTable
)
,
NULL
));
}
int32_t
catalogRefreshGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
)
{
int32_t
catalogRefreshGetTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
)
{
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
pTableName
,
true
,
pTableMeta
,
isSTable
));
CTG_API_LEAVE
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
pTableName
,
true
,
pTableMeta
,
CTG_GEN_STB_FLAG
(
isSTable
)
));
}
int32_t
catalogGetTableDistVgInfo
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
SArray
**
pVgList
)
{
...
...
@@ -2226,6 +2253,11 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
if
(
NULL
==
pCtg
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pTableName
||
NULL
==
pVgList
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
if
(
CTG_IS_INF_DBNAME
(
pTableName
->
dbname
))
{
ctgError
(
"no valid vgInfo for db, dbname:%s"
,
pTableName
->
dbname
);
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
STableMeta
*
tbMeta
=
NULL
;
int32_t
code
=
0
;
...
...
@@ -2236,7 +2268,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
*
pVgList
=
NULL
;
CTG_ERR_JRET
(
ctgGetTableMeta
(
pCtg
,
pRpc
,
pMgmtEps
,
pTableName
,
false
,
&
tbMeta
,
-
1
));
CTG_ERR_JRET
(
ctgGetTableMeta
(
pCtg
,
pRpc
,
pMgmtEps
,
pTableName
,
false
,
&
tbMeta
,
CTG_FLAG_UNKNOWN_STB
));
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
pTableName
,
db
);
...
...
@@ -2307,16 +2339,21 @@ _return:
}
int32_t
catalogGetTableHashVgroup
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
)
{
int32_t
catalogGetTableHashVgroup
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
)
{
CTG_API_ENTER
();
if
(
CTG_IS_INF_DBNAME
(
pTableName
->
dbname
))
{
ctgError
(
"no valid vgInfo for db, dbname:%s"
,
pTableName
->
dbname
);
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
SCtgDBCache
*
dbCache
=
NULL
;
int32_t
code
=
0
;
char
db
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
pTableName
,
db
);
SDBVgInfo
*
vgInfo
=
NULL
;
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
db
,
false
,
&
dbCache
,
&
vgInfo
));
CTG_ERR_JRET
(
ctgGetDBVgInfo
(
pCtg
,
pTrans
,
pMgmtEps
,
db
,
false
,
&
dbCache
,
&
vgInfo
));
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
vgInfo
?
vgInfo
:
dbCache
->
vgInfo
,
pTableName
,
pVgroup
));
...
...
@@ -2336,10 +2373,10 @@ _return:
}
int32_t
catalogGetAllMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
porter
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
)
{
int32_t
catalogGetAllMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pTrans
porter
||
NULL
==
pMgmtEps
||
NULL
==
pReq
||
NULL
==
pRsp
)
{
if
(
NULL
==
pCtg
||
NULL
==
pTrans
||
NULL
==
pMgmtEps
||
NULL
==
pReq
||
NULL
==
pRsp
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
@@ -2363,7 +2400,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
SName
*
name
=
taosArrayGet
(
pReq
->
pTableName
,
i
);
STableMeta
*
pTableMeta
=
NULL
;
CTG_ERR_JRET
(
ctgGetTableMeta
(
pCtg
,
pTrans
porter
,
pMgmtEps
,
name
,
false
,
&
pTableMeta
,
-
1
));
CTG_ERR_JRET
(
ctgGetTableMeta
(
pCtg
,
pTrans
,
pMgmtEps
,
name
,
false
,
&
pTableMeta
,
CTG_FLAG_UNKNOWN_STB
));
if
(
NULL
==
taosArrayPush
(
pRsp
->
pTableMeta
,
&
pTableMeta
))
{
ctgError
(
"taosArrayPush failed, idx:%d"
,
i
);
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
b2ad12cb
...
...
@@ -38,7 +38,7 @@
namespace
{
extern
"C"
int32_t
ctgGetTableMetaFromCache
(
struct
SCatalog
*
pCatalog
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
*
exist
);
int32_t
*
exist
,
int32_t
flag
);
extern
"C"
int32_t
ctgDbgGetClusterCacheNum
(
struct
SCatalog
*
pCatalog
,
int32_t
type
);
extern
"C"
int32_t
ctgActUpdateTbl
(
SCtgMetaAction
*
action
);
extern
"C"
int32_t
ctgDbgEnableDebug
(
char
*
option
);
...
...
@@ -243,6 +243,8 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) {
rspMsg
->
suid
=
ctgTestSuid
+
1
;
rspMsg
->
tuid
=
ctgTestSuid
+
1
;
rspMsg
->
vgId
=
1
;
rspMsg
->
pSchemas
=
(
SSchema
*
)
calloc
(
rspMsg
->
numOfTags
+
rspMsg
->
numOfColumns
,
sizeof
(
SSchema
));
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchemas
[
0
];
...
...
@@ -770,7 +772,7 @@ void *ctgTestGetCtableMetaThread(void *param) {
strcpy
(
cn
.
tname
,
ctgTestCTablename
);
while
(
!
ctgTestStop
)
{
code
=
ctgGetTableMetaFromCache
(
pCtg
,
&
cn
,
&
tbMeta
,
&
exist
);
code
=
ctgGetTableMetaFromCache
(
pCtg
,
&
cn
,
&
tbMeta
,
&
exist
,
0
);
if
(
code
||
0
==
exist
)
{
assert
(
0
);
}
...
...
@@ -827,6 +829,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
#if 0
TEST(tableMeta, normalTable) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
...
...
@@ -1289,6 +1292,7 @@ TEST(tableMeta, updateStbMeta) {
code = catalogUpdateSTableMeta(pCtg, &rsp);
ASSERT_EQ(code, 0);
tfree(rsp.pSchemas);
while (true) {
uint64_t n = 0;
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
b2ad12cb
...
...
@@ -105,6 +105,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case
QUERY_NODE_RAW_EXPR
:
res
=
walkNode
(((
SRawExprNode
*
)
pNode
)
->
pNode
,
order
,
walker
,
pContext
);
break
;
case
QUERY_NODE_TARGET
:
res
=
walkNode
(((
STargetNode
*
)
pNode
)
->
pExpr
,
order
,
walker
,
pContext
);
break
;
default:
break
;
}
...
...
@@ -228,6 +231,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case
QUERY_NODE_RAW_EXPR
:
res
=
rewriteNode
(
&
(((
SRawExprNode
*
)
pNode
)
->
pNode
),
order
,
rewriter
,
pContext
);
break
;
case
QUERY_NODE_TARGET
:
res
=
rewriteNode
(
&
(((
STargetNode
*
)
pNode
)
->
pExpr
),
order
,
rewriter
,
pContext
);
break
;
default:
break
;
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
b2ad12cb
...
...
@@ -101,20 +101,47 @@ SNode* nodesMakeNode(ENodeType type) {
return
NULL
;
}
static
EDealRes
destroyNode
(
SNode
*
pNode
,
void
*
pContext
)
{
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_VALUE
:
tfree
(((
SValueNode
*
)
pNode
)
->
literal
);
static
EDealRes
destroyNode
(
SNode
**
pNode
,
void
*
pContext
)
{
if
(
NULL
==
pNode
||
NULL
==
*
pNode
)
{
return
DEAL_RES_IGNORE_CHILD
;
}
switch
(
nodeType
(
*
pNode
))
{
case
QUERY_NODE_VALUE
:
{
SValueNode
*
pValue
=
(
SValueNode
*
)
*
pNode
;
tfree
(
pValue
->
literal
);
if
(
IS_VAR_DATA_TYPE
(
pValue
->
node
.
resType
.
type
))
{
tfree
(
pValue
->
datum
.
p
);
}
break
;
}
case
QUERY_NODE_LOGIC_CONDITION
:
nodesDestroyList
(((
SLogicConditionNode
*
)(
*
pNode
))
->
pParameterList
);
break
;
case
QUERY_NODE_FUNCTION
:
nodesDestroyList
(((
SFunctionNode
*
)(
*
pNode
))
->
pParameterList
);
break
;
case
QUERY_NODE_GROUPING_SET
:
nodesDestroyList
(((
SGroupingSetNode
*
)(
*
pNode
))
->
pParameterList
);
break
;
case
QUERY_NODE_NODE_LIST
:
nodesDestroyList
(((
SNodeListNode
*
)(
*
pNode
))
->
pNodeList
);
break
;
default:
break
;
}
tfree
(
pNode
);
tfree
(
*
pNode
);
return
DEAL_RES_CONTINUE
;
}
void
nodesDestroyNode
(
SNode
*
pNode
)
{
nodesWalkNodePostOrder
(
pNode
,
destroyNode
,
NULL
);
if
(
NULL
==
pNode
)
{
return
;
}
nodesRewriteNodePostOrder
(
&
pNode
,
destroyNode
,
NULL
);
}
SNodeList
*
nodesMakeList
()
{
...
...
@@ -191,9 +218,9 @@ SNode* nodesListGetNode(SNodeList* pList, int32_t index) {
}
void
nodesDestroyList
(
SNodeList
*
pList
)
{
S
Node
*
node
;
FOREACH
(
node
,
pLis
t
)
{
nodesDestroyNode
(
node
);
S
ListCell
*
pNext
=
pList
->
pHead
;
while
(
NULL
!=
pNex
t
)
{
pNext
=
nodesListErase
(
pList
,
pNext
);
}
tfree
(
pList
);
}
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
b2ad12cb
...
...
@@ -198,7 +198,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isSuperTable, STabl
}
int32_t
queryProcessTableMetaRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
int32_t
code
=
-
1
;
int32_t
code
=
0
;
STableMetaRsp
metaRsp
=
{
0
};
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
...
...
@@ -216,7 +216,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
goto
PROCESS_META_OVER
;
}
if
(
!
tIsValidSchema
(
metaRsp
.
pSchemas
,
metaRsp
.
numOfColumns
,
metaRsp
.
numOfTags
))
{
if
(
0
!=
strcmp
(
metaRsp
.
dbFName
,
TSDB_INFORMATION_SCHEMA_DB
)
&&
!
tIsValidSchema
(
metaRsp
.
pSchemas
,
metaRsp
.
numOfColumns
,
metaRsp
.
numOfTags
))
{
code
=
TSDB_CODE_TSC_INVALID_VALUE
;
goto
PROCESS_META_OVER
;
}
...
...
@@ -254,11 +254,11 @@ PROCESS_META_OVER:
void
initQueryModuleMsgHandle
()
{
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_
STB
_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_
TABLE
_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryBuildUseDbMsg
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_
STB
_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_
TABLE
_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryProcessUseDBRsp
;
}
...
...
source/libs/scalar/inc/filterInt.h
浏览文件 @
b2ad12cb
...
...
@@ -253,6 +253,7 @@ typedef struct SFilterInfo {
uint32_t
*
blkUnits
;
int8_t
*
blkUnitRes
;
void
*
pTable
;
SArray
*
blkList
;
SFilterPCtx
pctx
;
}
SFilterInfo
;
...
...
source/libs/scalar/inc/sclInt.h
浏览文件 @
b2ad12cb
...
...
@@ -24,7 +24,7 @@ extern "C" {
typedef
struct
SScalarCtx
{
int32_t
code
;
S
SDataBlock
*
pSrc
;
S
Array
*
pBlockList
;
/* element is SSDataBlock* */
SHashObj
*
pRes
;
/* element is SScalarParam */
}
SScalarCtx
;
...
...
@@ -44,10 +44,13 @@ typedef struct SScalarCtx {
#define SCL_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
int32_t
sclMoveParamListData
(
SScalarParam
*
params
,
int32_t
listNum
,
int32_t
idx
);
bool
sclIsNull
(
SScalarParam
*
param
,
int32_t
idx
);
void
sclSetNull
(
SScalarParam
*
param
,
int32_t
idx
);
void
sclFreeParam
(
SScalarParam
*
param
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_SCALARINT_H
\ No newline at end of file
#endif // TDENGINE_SCALARINT_H
source/libs/scalar/inc/sclvector.h
浏览文件 @
b2ad12cb
...
...
@@ -22,7 +22,11 @@ extern "C" {
#include "sclfunc.h"
typedef
void
(
*
_bin_scalar_fn_t
)(
SScalarParam
*
pLeft
,
SScalarParam
*
pRight
,
void
*
output
,
int32_t
order
);
typedef
double
(
*
_mathFunc
)(
double
,
double
,
bool
*
);
typedef
void
(
*
_bufConverteFunc
)(
char
*
buf
,
SScalarParam
*
pOut
,
int32_t
outType
);
typedef
void
(
*
_bin_scalar_fn_t
)(
SScalarParam
*
pLeft
,
SScalarParam
*
pRight
,
SScalarParam
*
output
,
int32_t
order
);
_bin_scalar_fn_t
getBinScalarOperatorFn
(
int32_t
binOperator
);
#ifdef __cplusplus
...
...
source/libs/scalar/src/filter.c
浏览文件 @
b2ad12cb
...
...
@@ -18,6 +18,7 @@
//#include "queryLog.h"
#include "tcompare.h"
#include "filterInt.h"
#include "sclInt.h"
#include "filter.h"
#include "tep.h"
...
...
@@ -1678,7 +1679,7 @@ void filterFreeInfo(SFilterInfo *info) {
tfree
(
info
->
cunits
);
tfree
(
info
->
blkUnitRes
);
tfree
(
info
->
blkUnits
);
for
(
int32_t
i
=
0
;
i
<
FLD_TYPE_MAX
;
++
i
)
{
for
(
uint32_t
f
=
0
;
f
<
info
->
fields
[
i
].
num
;
++
f
)
{
filterFreeField
(
&
info
->
fields
[
i
].
fields
[
f
],
i
);
...
...
@@ -2784,7 +2785,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
//} else {
uint8_t
optr
=
cunit
->
optr
;
if
(
isNull
(
colData
,
cunit
->
dataType
))
{
if
(
colDataIsNull
((
SColumnInfoData
*
)(
cunit
->
colData
),
0
,
i
,
NULL
))
{
(
*
p
)[
i
]
=
optr
==
OP_TYPE_IS_NULL
?
true
:
false
;
}
else
{
if
(
optr
==
OP_TYPE_IS_NOT_NULL
)
{
...
...
@@ -2881,12 +2882,12 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
(
*
p
)[
i
]
=
1
;
}
else
if
(
*
(
char
*
)
colData
==
TSDB_DATA_TYPE_JSON
){
// for json is null
colData
=
POINTER_SHIFT
(
colData
,
CHAR_BYTES
);
(
*
p
)[
i
]
=
isNull
(
colData
,
info
->
cunits
[
uidx
].
dataType
);
(
*
p
)[
i
]
=
colDataIsNull
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
0
,
i
,
NULL
);
}
else
{
(
*
p
)[
i
]
=
0
;
}
}
else
{
(
*
p
)[
i
]
=
((
colData
==
NULL
)
||
isNull
(
colData
,
info
->
cunits
[
uidx
].
dataType
));
(
*
p
)[
i
]
=
((
colData
==
NULL
)
||
colDataIsNull
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
0
,
i
,
NULL
));
}
if
((
*
p
)[
i
]
==
0
)
{
all
=
false
;
...
...
@@ -2916,12 +2917,12 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
(
*
p
)[
i
]
=
0
;
}
else
if
(
*
(
char
*
)
colData
==
TSDB_DATA_TYPE_JSON
){
// for json is not null
colData
=
POINTER_SHIFT
(
colData
,
CHAR_BYTES
);
(
*
p
)[
i
]
=
!
isNull
(
colData
,
info
->
cunits
[
uidx
].
dataType
);
(
*
p
)[
i
]
=
!
colDataIsNull
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
0
,
i
,
NULL
);
}
else
{
// for json->'key' is not null
(
*
p
)[
i
]
=
1
;
}
}
else
{
(
*
p
)[
i
]
=
((
colData
!=
NULL
)
&&
!
isNull
(
colData
,
info
->
cunits
[
uidx
].
dataType
));
(
*
p
)[
i
]
=
((
colData
!=
NULL
)
&&
!
colDataIsNull
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
0
,
i
,
NULL
));
}
if
((
*
p
)[
i
]
==
0
)
{
...
...
@@ -2952,7 +2953,7 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
void
*
colData
=
colDataGetData
((
SColumnInfoData
*
)
info
->
cunits
[
0
].
colData
,
i
);
if
(
colData
==
NULL
||
isNull
(
colData
,
info
->
cunits
[
0
].
dataType
))
{
if
(
colData
==
NULL
||
colDataIsNull
((
SColumnInfoData
*
)
info
->
cunits
[
0
].
colData
,
0
,
i
,
NULL
))
{
all
=
false
;
continue
;
}
...
...
@@ -2982,7 +2983,7 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
uint32_t
uidx
=
info
->
groups
[
0
].
unitIdxs
[
0
];
void
*
colData
=
colDataGetData
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
i
);
if
(
colData
==
NULL
||
isNull
(
colData
,
info
->
cunits
[
uidx
].
dataType
))
{
if
(
colData
==
NULL
||
colDataIsNull
((
SColumnInfoData
*
)
info
->
cunits
[
uidx
].
colData
,
0
,
i
,
NULL
))
{
(
*
p
)[
i
]
=
0
;
all
=
false
;
continue
;
...
...
@@ -3039,7 +3040,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
//} else {
uint8_t
optr
=
cunit
->
optr
;
if
(
colData
==
NULL
||
isNull
(
colData
,
cunit
->
dataType
))
{
if
(
colData
==
NULL
||
colDataIsNull
((
SColumnInfoData
*
)(
cunit
->
colData
),
0
,
i
,
NULL
))
{
(
*
p
)[
i
]
=
optr
==
OP_TYPE_IS_NULL
?
true
:
false
;
}
else
{
if
(
optr
==
OP_TYPE_IS_NOT_NULL
)
{
...
...
@@ -3669,9 +3670,17 @@ _return:
bool
filterExecute
(
SFilterInfo
*
info
,
SSDataBlock
*
pSrc
,
int8_t
**
p
,
SColumnDataAgg
*
statis
,
int16_t
numOfCols
)
{
if
(
info
->
scalarMode
)
{
SScalarParam
output
=
{
0
};
FLT_ERR_RET
(
scalarCalculate
(
info
->
sclCtx
.
node
,
pSrc
,
&
output
));
SArray
*
pList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
taosArrayPush
(
pList
,
&
pSrc
);
FLT_ERR_RET
(
scalarCalculate
(
info
->
sclCtx
.
node
,
pList
,
&
output
));
taosArrayDestroy
(
pList
);
*
p
=
output
.
orig
.
data
;
output
.
orig
.
data
=
NULL
;
*
p
=
output
.
data
;
sclFreeParam
(
&
output
)
;
int8_t
*
r
=
output
.
data
;
for
(
int32_t
i
=
0
;
i
<
output
.
num
;
++
i
)
{
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
b2ad12cb
...
...
@@ -5,6 +5,7 @@
#include "functionMgt.h"
#include "sclvector.h"
#include "sclInt.h"
#include "tep.h"
int32_t
scalarGetOperatorParamNum
(
EOperatorType
type
)
{
if
(
OP_TYPE_IS_NULL
==
type
||
OP_TYPE_IS_NOT_NULL
==
type
||
OP_TYPE_IS_TRUE
==
type
||
OP_TYPE_IS_NOT_TRUE
==
type
...
...
@@ -87,6 +88,26 @@ _return:
SCL_RET
(
code
);
}
FORCE_INLINE
bool
sclIsNull
(
SScalarParam
*
param
,
int32_t
idx
)
{
if
(
param
->
dataInBlock
)
{
return
colDataIsNull
(
param
->
orig
.
columnData
,
0
,
idx
,
NULL
);
}
return
param
->
bitmap
?
colDataIsNull_f
(
param
->
bitmap
,
idx
)
:
false
;
}
FORCE_INLINE
void
sclSetNull
(
SScalarParam
*
param
,
int32_t
idx
)
{
if
(
NULL
==
param
->
bitmap
)
{
param
->
bitmap
=
calloc
(
BitmapLen
(
param
->
num
),
sizeof
(
char
));
if
(
NULL
==
param
->
bitmap
)
{
sclError
(
"calloc %d failed"
,
param
->
num
);
return
;
}
}
colDataSetNull_f
(
param
->
bitmap
,
idx
);
}
void
sclFreeRes
(
SHashObj
*
res
)
{
SScalarParam
*
p
=
NULL
;
...
...
@@ -95,7 +116,7 @@ void sclFreeRes(SHashObj *res) {
p
=
(
SScalarParam
*
)
pIter
;
if
(
p
)
{
tfree
(
p
->
data
);
sclFreeParam
(
p
);
}
pIter
=
taosHashIterate
(
res
,
pIter
);
...
...
@@ -104,19 +125,54 @@ void sclFreeRes(SHashObj *res) {
taosHashCleanup
(
res
);
}
void
sclFreeParamNoData
(
SScalarParam
*
param
)
{
tfree
(
param
->
bitmap
);
}
void
sclFreeParam
(
SScalarParam
*
param
)
{
tfree
(
param
->
data
);
sclFreeParamNoData
(
param
);
if
(
!
param
->
dataInBlock
)
{
if
(
SCL_DATA_TYPE_DUMMY_HASH
==
param
->
type
)
{
taosHashCleanup
((
SHashObj
*
)
param
->
orig
.
data
);
}
else
{
tfree
(
param
->
orig
.
data
);
}
}
}
int32_t
sclCopyValueNodeValue
(
SValueNode
*
pNode
,
void
**
res
)
{
if
(
TSDB_DATA_TYPE_NULL
==
pNode
->
node
.
resType
.
type
)
{
return
TSDB_CODE_SUCCESS
;
}
*
res
=
malloc
(
pNode
->
node
.
resType
.
bytes
);
if
(
NULL
==
(
*
res
))
{
sclError
(
"malloc %d failed"
,
pNode
->
node
.
resType
.
bytes
);
SCL_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
*
res
,
nodesGetValueFromNode
(
pNode
),
pNode
->
node
.
resType
.
bytes
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
sclInitParam
(
SNode
*
node
,
SScalarParam
*
param
,
SScalarCtx
*
ctx
,
int32_t
*
rowNum
)
{
switch
(
nodeType
(
node
))
{
case
QUERY_NODE_VALUE
:
{
SValueNode
*
valueNode
=
(
SValueNode
*
)
node
;
//SCL_ERR_RET(sclCopyValueNodeValue(valueNode, ¶m->data));
param
->
data
=
nodesGetValueFromNode
(
valueNode
);
param
->
orig
.
data
=
param
->
data
;
param
->
num
=
1
;
param
->
type
=
valueNode
->
node
.
resType
.
type
;
param
->
bytes
=
valueNode
->
node
.
resType
.
bytes
;
param
->
colData
=
false
;
if
(
TSDB_DATA_TYPE_NULL
==
param
->
type
)
{
sclSetNull
(
param
,
0
);
}
param
->
dataInBlock
=
false
;
break
;
}
...
...
@@ -128,34 +184,44 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
}
SCL_ERR_RET
(
scalarGenerateSetFromList
(
&
param
->
data
,
node
,
nodeList
->
dataType
.
type
));
param
->
orig
.
data
=
param
->
data
;
param
->
num
=
1
;
param
->
type
=
SCL_DATA_TYPE_DUMMY_HASH
;
param
->
colData
=
false
;
param
->
dataInBlock
=
false
;
if
(
taosHashPut
(
ctx
->
pRes
,
&
node
,
POINTER_BYTES
,
param
,
sizeof
(
*
param
)))
{
taosHashCleanup
(
param
->
orig
.
data
);
param
->
orig
.
data
=
NULL
;
sclError
(
"taosHashPut nodeList failed, size:%d"
,
(
int32_t
)
sizeof
(
*
param
));
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
break
;
}
case
QUERY_NODE_COLUMN
:
{
if
(
NULL
==
ctx
)
{
sclError
(
"invalid node type for constant calculating, type:%d,
ctx:%p"
,
nodeType
(
node
),
ctx
);
if
(
NULL
==
ctx
->
pBlockList
)
{
sclError
(
"invalid node type for constant calculating, type:%d,
src:%p"
,
nodeType
(
node
),
ctx
->
pBlockList
);
SCL_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
SColumnNode
*
ref
=
(
SColumnNode
*
)
node
;
if
(
ref
->
slotId
>=
taosArrayGetSize
(
ctx
->
pSrc
->
pDataBlock
))
{
sclError
(
"column
ref slotId is too big, slodId:%d, dataBlockSize:%d"
,
ref
->
slotId
,
(
int32_t
)
taosArrayGetSize
(
ctx
->
pSrc
->
pDataBlock
));
if
(
ref
->
dataBlockId
>=
taosArrayGetSize
(
ctx
->
pBlockList
))
{
sclError
(
"column
tupleId is too big, tupleId:%d, dataBlockNum:%d"
,
ref
->
dataBlockId
,
(
int32_t
)
taosArrayGetSize
(
ctx
->
pBlockList
));
SCL_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SColumnInfoData
*
columnData
=
(
SColumnInfoData
*
)
taosArrayGet
(
ctx
->
pSrc
->
pDataBlock
,
ref
->
slotId
);
if
(
IS_VAR_DATA_TYPE
(
columnData
->
info
.
type
))
{
param
->
data
=
columnData
;
param
->
colData
=
true
;
}
else
{
param
->
data
=
columnData
->
pData
;
param
->
colData
=
false
;
SSDataBlock
*
block
=
*
(
SSDataBlock
**
)
taosArrayGet
(
ctx
->
pBlockList
,
ref
->
dataBlockId
);
if
(
NULL
==
block
||
ref
->
slotId
>=
taosArrayGetSize
(
block
->
pDataBlock
))
{
sclError
(
"column slotId is too big, slodId:%d, dataBlockSize:%d"
,
ref
->
slotId
,
(
int32_t
)
taosArrayGetSize
(
block
->
pDataBlock
));
SCL_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SColumnInfoData
*
columnData
=
(
SColumnInfoData
*
)
taosArrayGet
(
block
->
pDataBlock
,
ref
->
slotId
);
param
->
data
=
NULL
;
param
->
orig
.
columnData
=
columnData
;
param
->
dataInBlock
=
true
;
param
->
num
=
ctx
->
pSrc
->
info
.
rows
;
param
->
num
=
block
->
info
.
rows
;
param
->
type
=
columnData
->
info
.
type
;
param
->
bytes
=
columnData
->
info
.
bytes
;
...
...
@@ -163,11 +229,6 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
}
case
QUERY_NODE_LOGIC_CONDITION
:
case
QUERY_NODE_OPERATOR
:
{
if
(
NULL
==
ctx
)
{
sclError
(
"invalid node type for constant calculating, type:%d, ctx:%p"
,
nodeType
(
node
),
ctx
);
SCL_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
SScalarParam
*
res
=
(
SScalarParam
*
)
taosHashGet
(
ctx
->
pRes
,
&
node
,
POINTER_BYTES
);
if
(
NULL
==
res
)
{
sclError
(
"no result for node, type:%d, node:%p"
,
nodeType
(
node
),
node
);
...
...
@@ -192,20 +253,26 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
scl
ParamMoveNext
(
SScalarParam
*
params
,
int32_t
num
)
{
int32_t
scl
MoveParamListData
(
SScalarParam
*
params
,
int32_t
listNum
,
int32_t
idx
)
{
SScalarParam
*
param
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
n
um
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
listN
um
;
++
i
)
{
param
=
params
+
i
;
if
(
1
==
param
->
num
)
{
continue
;
}
if
(
IS_VAR_DATA_TYPE
(
param
->
type
))
{
param
->
data
=
(
char
*
)(
param
->
data
)
+
varDataTLen
(
param
->
data
);
if
(
param
->
dataInBlock
)
{
param
->
data
=
colDataGetData
(
param
->
orig
.
columnData
,
idx
);
}
else
if
(
idx
)
{
if
(
IS_VAR_DATA_TYPE
(
param
->
type
))
{
param
->
data
=
(
char
*
)(
param
->
data
)
+
varDataTLen
(
param
->
data
);
}
else
{
param
->
data
=
(
char
*
)(
param
->
data
)
+
tDataTypes
[
param
->
type
].
bytes
;
}
}
else
{
param
->
data
=
(
char
*
)(
param
->
data
)
+
tDataTypes
[
param
->
type
].
bytes
;
param
->
data
=
param
->
orig
.
data
;
}
}
...
...
@@ -281,8 +348,7 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu
SScalarFuncExecFuncs
ffpSet
=
{
0
};
int32_t
code
=
fmGetScalarFuncExecFuncs
(
node
->
funcId
,
&
ffpSet
);
if
(
code
)
{
sclError
(
"fmGetFuncExecFuncs failed, funcId:%d, code:%s"
,
node
->
funcId
,
tstrerror
(
code
));
sclError
(
"fmGetFuncExecFuncs failed, funcId:%d, code:%s"
,
node
->
funcId
,
tstrerror
(
code
));
SCL_ERR_RET
(
code
);
}
...
...
@@ -296,24 +362,27 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu
sclError
(
"calloc %d failed"
,
(
int32_t
)(
rowNum
*
sizeof
(
tDataTypes
[
output
->
type
].
bytes
)));
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
output
->
orig
.
data
=
output
->
data
;
for
(
int32_t
i
=
0
;
i
<
rowNum
;
++
i
)
{
sclMoveParamListData
(
output
,
1
,
i
);
sclMoveParamListData
(
params
,
node
->
pParameterList
->
length
,
i
);
code
=
(
*
ffpSet
.
process
)(
params
,
node
->
pParameterList
->
length
,
output
);
if
(
code
)
{
sclError
(
"scalar function exec failed, funcId:%d, code:%s"
,
node
->
funcId
,
tstrerror
(
code
));
sclError
(
"scalar function exec failed, funcId:%d, code:%s"
,
node
->
funcId
,
tstrerror
(
code
));
SCL_ERR_JRET
(
code
);
}
sclParamMoveNext
(
output
,
1
);
sclParamMoveNext
(
params
,
node
->
pParameterList
->
length
);
}
return
TSDB_CODE_SUCCESS
;
_return:
for
(
int32_t
i
=
0
;
i
<
node
->
pParameterList
->
length
;
++
i
)
{
sclFreeParamNoData
(
params
+
i
);
}
tfree
(
params
);
SCL_RET
(
code
);
}
...
...
@@ -348,12 +417,14 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
sclError
(
"calloc %d failed"
,
(
int32_t
)(
rowNum
*
sizeof
(
bool
)));
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
output
->
orig
.
data
=
output
->
data
;
void
*
data
=
output
->
data
;
bool
value
=
false
;
for
(
int32_t
i
=
0
;
i
<
rowNum
;
++
i
)
{
sclMoveParamListData
(
output
,
1
,
i
);
sclMoveParamListData
(
params
,
node
->
pParameterList
->
length
,
i
);
for
(
int32_t
m
=
0
;
m
<
node
->
pParameterList
->
length
;
++
m
)
{
GET_TYPED_DATA
(
value
,
bool
,
params
[
m
].
type
,
params
[
m
].
data
);
...
...
@@ -367,17 +438,14 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
}
*
(
bool
*
)
output
->
data
=
value
;
sclParamMoveNext
(
output
,
1
);
sclParamMoveNext
(
params
,
node
->
pParameterList
->
length
);
}
output
->
data
=
data
;
return
TSDB_CODE_SUCCESS
;
_return:
for
(
int32_t
i
=
0
;
i
<
node
->
pParameterList
->
length
;
++
i
)
{
sclFreeParamNoData
(
params
+
i
);
}
tfree
(
params
);
SCL_RET
(
code
);
}
...
...
@@ -397,6 +465,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
sclError
(
"calloc %d failed"
,
(
int32_t
)
rowNum
*
tDataTypes
[
output
->
type
].
bytes
);
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
output
->
orig
.
data
=
output
->
data
;
_bin_scalar_fn_t
OperatorFn
=
getBinScalarOperatorFn
(
node
->
opType
);
...
...
@@ -404,23 +473,27 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
SScalarParam
*
pLeft
=
&
params
[
0
];
SScalarParam
*
pRight
=
paramNum
>
1
?
&
params
[
1
]
:
NULL
;
OperatorFn
(
pLeft
,
pRight
,
output
->
data
,
TSDB_ORDER_ASC
);
return
TSDB_CODE_SUCCESS
;
OperatorFn
(
pLeft
,
pRight
,
output
,
TSDB_ORDER_ASC
);
_return:
for
(
int32_t
i
=
0
;
i
<
paramNum
;
++
i
)
{
sclFreeParamNoData
(
params
+
i
);
}
tfree
(
params
);
SCL_RET
(
code
);
}
EDealRes
sclRewriteFunction
(
SNode
**
pNode
,
void
*
pContext
)
{
EDealRes
sclRewriteFunction
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SFunctionNode
*
node
=
(
SFunctionNode
*
)
*
pNode
;
SScalarParam
output
=
{
0
};
*
(
int32_t
*
)
pContext
=
sclExecFuncion
(
node
,
NULL
,
&
output
);
if
(
*
(
int32_t
*
)
pContext
)
{
ctx
->
code
=
sclExecFuncion
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
return
DEAL_RES_ERROR
;
}
...
...
@@ -428,7 +501,7 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
if
(
NULL
==
res
)
{
sclError
(
"make value node failed"
);
sclFreeParam
(
&
output
);
*
(
int32_t
*
)
pContext
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
ctx
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
DEAL_RES_ERROR
;
}
...
...
@@ -449,12 +522,12 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
EDealRes
sclRewriteLogic
(
SNode
**
pNode
,
void
*
pContext
)
{
EDealRes
sclRewriteLogic
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SLogicConditionNode
*
node
=
(
SLogicConditionNode
*
)
*
pNode
;
SScalarParam
output
=
{
0
};
*
(
int32_t
*
)
pContext
=
sclExecLogic
(
node
,
NULL
,
&
output
);
if
(
*
(
int32_t
*
)
pContext
)
{
ctx
->
code
=
sclExecLogic
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
return
DEAL_RES_ERROR
;
}
...
...
@@ -462,7 +535,7 @@ EDealRes sclRewriteLogic(SNode** pNode, void* pContext) {
if
(
NULL
==
res
)
{
sclError
(
"make value node failed"
);
sclFreeParam
(
&
output
);
*
(
int32_t
*
)
pContext
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
ctx
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
DEAL_RES_ERROR
;
}
...
...
@@ -483,12 +556,12 @@ EDealRes sclRewriteLogic(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
EDealRes
sclRewriteOperator
(
SNode
**
pNode
,
void
*
pContext
)
{
EDealRes
sclRewriteOperator
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SOperatorNode
*
node
=
(
SOperatorNode
*
)
*
pNode
;
SScalarParam
output
=
{
0
};
*
(
int32_t
*
)
pContext
=
sclExecOperator
(
node
,
NULL
,
&
output
);
if
(
*
(
int32_t
*
)
pContext
)
{
ctx
->
code
=
sclExecOperator
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
return
DEAL_RES_ERROR
;
}
...
...
@@ -496,7 +569,7 @@ EDealRes sclRewriteOperator(SNode** pNode, void* pContext) {
if
(
NULL
==
res
)
{
sclError
(
"make value node failed"
);
sclFreeParam
(
&
output
);
*
(
int32_t
*
)
pContext
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
ctx
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
DEAL_RES_ERROR
;
}
...
...
@@ -523,28 +596,29 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
*
pNode
))
{
return
sclRewriteFunction
(
pNode
,
pContext
);
return
sclRewriteFunction
(
pNode
,
ctx
);
}
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
*
pNode
))
{
return
sclRewriteLogic
(
pNode
,
pContext
);
return
sclRewriteLogic
(
pNode
,
ctx
);
}
if
(
QUERY_NODE_OPERATOR
==
nodeType
(
*
pNode
))
{
return
sclRewriteOperator
(
pNode
,
pContext
);
return
sclRewriteOperator
(
pNode
,
ctx
);
}
sclError
(
"invalid node type for calculating constants, type:%d"
,
nodeType
(
*
pNode
));
*
(
int32_t
*
)
pContext
=
TSDB_CODE_QRY_INVALID_INPUT
;
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
DEAL_RES_ERROR
;
}
EDealRes
sclWalkFunction
(
SNode
*
pNode
,
void
*
pContext
)
{
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
EDealRes
sclWalkFunction
(
SNode
*
pNode
,
SScalarCtx
*
ctx
)
{
SFunctionNode
*
node
=
(
SFunctionNode
*
)
pNode
;
SScalarParam
output
=
{
0
};
...
...
@@ -562,8 +636,7 @@ EDealRes sclWalkFunction(SNode* pNode, void* pContext) {
}
EDealRes
sclWalkLogic
(
SNode
*
pNode
,
void
*
pContext
)
{
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
EDealRes
sclWalkLogic
(
SNode
*
pNode
,
SScalarCtx
*
ctx
)
{
SLogicConditionNode
*
node
=
(
SLogicConditionNode
*
)
pNode
;
SScalarParam
output
=
{
0
};
...
...
@@ -581,8 +654,7 @@ EDealRes sclWalkLogic(SNode* pNode, void* pContext) {
}
EDealRes
sclWalkOperator
(
SNode
*
pNode
,
void
*
pContext
)
{
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
EDealRes
sclWalkOperator
(
SNode
*
pNode
,
SScalarCtx
*
ctx
)
{
SOperatorNode
*
node
=
(
SOperatorNode
*
)
pNode
;
SScalarParam
output
=
{
0
};
...
...
@@ -599,27 +671,69 @@ EDealRes sclWalkOperator(SNode* pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
EDealRes
sclWalkTarget
(
SNode
*
pNode
,
SScalarCtx
*
ctx
)
{
STargetNode
*
target
=
(
STargetNode
*
)
pNode
;
if
(
target
->
dataBlockId
>=
taosArrayGetSize
(
ctx
->
pBlockList
))
{
sclError
(
"target tupleId is too big, tupleId:%d, dataBlockNum:%d"
,
target
->
dataBlockId
,
(
int32_t
)
taosArrayGetSize
(
ctx
->
pBlockList
));
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
DEAL_RES_ERROR
;
}
SSDataBlock
*
block
=
*
(
SSDataBlock
**
)
taosArrayGet
(
ctx
->
pBlockList
,
target
->
dataBlockId
);
if
(
target
->
slotId
>=
taosArrayGetSize
(
block
->
pDataBlock
))
{
sclError
(
"target slot not exist, dataBlockId:%d, slotId:%d, dataBlockNum:%d"
,
target
->
dataBlockId
,
target
->
slotId
,
(
int32_t
)
taosArrayGetSize
(
block
->
pDataBlock
));
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
DEAL_RES_ERROR
;
}
SColumnInfoData
*
col
=
taosArrayGet
(
block
->
pDataBlock
,
target
->
slotId
);
SScalarParam
*
res
=
(
SScalarParam
*
)
taosHashGet
(
ctx
->
pRes
,
(
void
*
)
&
target
->
pExpr
,
POINTER_BYTES
);
if
(
NULL
==
res
)
{
sclError
(
"no valid res in hash, node:%p, type:%d"
,
target
->
pExpr
,
nodeType
(
target
->
pExpr
));
ctx
->
code
=
TSDB_CODE_QRY_APP_ERROR
;
return
DEAL_RES_ERROR
;
}
for
(
int32_t
i
=
0
;
i
<
res
->
num
;
++
i
)
{
sclMoveParamListData
(
res
,
1
,
i
);
colDataAppend
(
col
,
i
,
res
->
data
,
sclIsNull
(
res
,
i
));
}
sclFreeParam
(
res
);
taosHashRemove
(
ctx
->
pRes
,
(
void
*
)
&
target
->
pExpr
,
POINTER_BYTES
);
return
DEAL_RES_CONTINUE
;
}
EDealRes
sclCalcWalker
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_VALUE
==
nodeType
(
pNode
)
||
QUERY_NODE_NODE_LIST
==
nodeType
(
pNode
)
||
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
return
DEAL_RES_CONTINUE
;
}
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
pNode
))
{
return
sclWalkFunction
(
pNode
,
pContext
);
return
sclWalkFunction
(
pNode
,
ctx
);
}
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pNode
))
{
return
sclWalkLogic
(
pNode
,
pContext
);
return
sclWalkLogic
(
pNode
,
ctx
);
}
if
(
QUERY_NODE_OPERATOR
==
nodeType
(
pNode
))
{
return
sclWalkOperator
(
pNode
,
pContext
);
return
sclWalkOperator
(
pNode
,
ctx
);
}
sclError
(
"invalid node type for scalar calculating, type:%d"
,
nodeType
(
pNode
));
if
(
QUERY_NODE_TARGET
==
nodeType
(
pNode
))
{
return
sclWalkTarget
(
pNode
,
ctx
);
}
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
sclError
(
"invalid node type for scalar calculating, type:%d"
,
nodeType
(
pNode
))
;
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
@@ -634,26 +748,33 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
}
int32_t
code
=
0
;
SScalarCtx
ctx
=
{
0
};
ctx
.
pRes
=
taosHashInit
(
SCL_DEFAULT_OP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
pRes
)
{
sclError
(
"taosHashInit failed, num:%d"
,
SCL_DEFAULT_OP_NUM
);
SCL_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
nodesRewriteNodePostOrder
(
&
pNode
,
sclConstantsRewriter
,
(
void
*
)
&
c
ode
);
nodesRewriteNodePostOrder
(
&
pNode
,
sclConstantsRewriter
,
(
void
*
)
&
c
tx
);
if
(
code
)
{
nodesDestroyNode
(
pNode
);
SCL_ERR_RET
(
code
);
}
SCL_ERR_JRET
(
ctx
.
code
);
*
pRes
=
pNode
;
SCL_RET
(
code
);
_return:
sclFreeRes
(
ctx
.
pRes
);
return
code
;
}
int32_t
scalarCalculate
(
SNode
*
pNode
,
S
SDataBlock
*
pSrc
,
SScalarParam
*
pDst
)
{
if
(
NULL
==
pNode
||
NULL
==
p
Src
||
NULL
==
pD
st
)
{
int32_t
scalarCalculate
(
SNode
*
pNode
,
S
Array
*
pBlockList
,
SScalarParam
*
pDst
)
{
if
(
NULL
==
pNode
||
NULL
==
p
BlockLi
st
)
{
SCL_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
SScalarCtx
ctx
=
{.
code
=
0
,
.
p
Src
=
pSrc
};
SScalarCtx
ctx
=
{.
code
=
0
,
.
p
BlockList
=
pBlockList
};
ctx
.
pRes
=
taosHashInit
(
SCL_DEFAULT_OP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
pRes
)
{
...
...
@@ -663,23 +784,28 @@ int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst) {
nodesWalkNodePostOrder
(
pNode
,
sclCalcWalker
,
(
void
*
)
&
ctx
);
if
(
ctx
.
code
)
{
nodesDestroyNode
(
pNode
);
sclFreeRes
(
ctx
.
pRes
);
SCL_ERR_RET
(
ctx
.
code
);
}
SCL_ERR_JRET
(
ctx
.
code
);
SScalarParam
*
res
=
(
SScalarParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
if
(
NULL
==
res
)
{
sclError
(
"no res for calculating, node:%p, type:%d"
,
pNode
,
nodeType
(
pNode
));
SCL_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
if
(
pDst
)
{
SScalarParam
*
res
=
(
SScalarParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
if
(
NULL
==
res
)
{
sclError
(
"no valid res in hash, node:%p, type:%d"
,
pNode
,
nodeType
(
pNode
));
SCL_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
sclMoveParamListData
(
res
,
1
,
0
);
*
pDst
=
*
res
;
taosHashRemove
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
}
*
pDst
=
*
res
;
nodesDestroyNode
(
pNode
);
_return:
//nodesDestroyNode(pNode);
sclFreeRes
(
ctx
.
pRes
);
return
TSDB_CODE_SUCCESS
;
return
code
;
}
...
...
source/libs/scalar/src/sclvector.c
浏览文件 @
b2ad12cb
此差异已折叠。
点击以展开。
source/libs/scalar/test/filter/filterTests.cpp
浏览文件 @
b2ad12cb
此差异已折叠。
点击以展开。
source/libs/scalar/test/scalar/scalarTests.cpp
浏览文件 @
b2ad12cb
此差异已折叠。
点击以展开。
source/util/src/terror.c
浏览文件 @
b2ad12cb
...
...
@@ -243,6 +243,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_COLUMN_ALREADY_EXIST
,
"Column already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_COLUMN_NOT_EXIST
,
"Column does not exist"
)
// mnode-infoSchema
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_INFOS_TBL
,
"Invalid information schema table name"
)
// mnode-func
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_FUNC_ALREADY_EXIST
,
"Func already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_FUNC_NOT_EXIST
,
"Func not exists"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录