Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
924e3ad6
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
924e3ad6
编写于
12月 15, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
use db to get vgroup list and db info
上级
fb035bb3
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
297 addition
and
31 deletion
+297
-31
include/common/taosmsg.h
include/common/taosmsg.h
+23
-2
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+1
-7
include/libs/query/query.h
include/libs/query/query.h
+16
-0
include/os/osMemory.h
include/os/osMemory.h
+2
-2
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+103
-13
source/libs/parser/CMakeLists.txt
source/libs/parser/CMakeLists.txt
+2
-2
source/libs/parser/test/CMakeLists.txt
source/libs/parser/test/CMakeLists.txt
+1
-1
source/libs/planner/CMakeLists.txt
source/libs/planner/CMakeLists.txt
+2
-2
source/libs/planner/test/CMakeLists.txt
source/libs/planner/test/CMakeLists.txt
+1
-1
source/libs/query/src/querymsg.c
source/libs/query/src/querymsg.c
+145
-1
未找到文件。
include/common/taosmsg.h
浏览文件 @
924e3ad6
...
@@ -219,6 +219,13 @@ typedef struct SBuildTableMetaInput {
...
@@ -219,6 +219,13 @@ typedef struct SBuildTableMetaInput {
char
*
tableFullName
;
char
*
tableFullName
;
}
SBuildTableMetaInput
;
}
SBuildTableMetaInput
;
typedef
struct
SBuildUseDBInput
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
vgroupVersion
;
int32_t
dbGroupVersion
;
}
SBuildUseDBInput
;
#pragma pack(push, 1)
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
...
@@ -617,9 +624,12 @@ typedef struct {
...
@@ -617,9 +624,12 @@ typedef struct {
typedef
struct
{
typedef
struct
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_TABLE_FNAME_LEN
];
int8_t
ignoreNotExists
;
int8_t
ignoreNotExists
;
int32_t
vgroupVersion
;
int32_t
dbGroupVersion
;
int32_t
reserve
[
8
];
int32_t
reserve
[
8
];
}
SUseDbMsg
;
}
SUseDbMsg
;
typedef
struct
{
typedef
struct
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
reserve
[
8
];
int32_t
reserve
[
8
];
...
@@ -806,8 +816,6 @@ typedef struct SVgroupListRspMsg {
...
@@ -806,8 +816,6 @@ typedef struct SVgroupListRspMsg {
SVgroupInfo
vgroupInfo
[];
SVgroupInfo
vgroupInfo
[];
}
SVgroupListRspMsg
;
}
SVgroupListRspMsg
;
typedef
SVgroupListRspMsg
SVgroupListInfo
;
typedef
struct
{
typedef
struct
{
int32_t
vgId
;
int32_t
vgId
;
int8_t
numOfEps
;
int8_t
numOfEps
;
...
@@ -852,6 +860,19 @@ typedef struct {
...
@@ -852,6 +860,19 @@ typedef struct {
char
*
data
;
char
*
data
;
}
STagData
;
}
STagData
;
typedef
struct
{
int32_t
vgroupNum
;
int32_t
vgroupVersion
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
dbVgroupVersion
;
int32_t
dbVgroupNum
;
int32_t
dbHashRange
;
SVgroupInfo
vgroupInfo
[];
//int32_t vgIdList[];
}
SUseDbRspMsg
;
/*
/*
* sql: show tables like '%a_%'
* sql: show tables like '%a_%'
* payload is the query condition, e.g., '%a_%'
* payload is the query condition, e.g., '%a_%'
...
...
include/libs/catalog/catalog.h
浏览文件 @
924e3ad6
...
@@ -27,16 +27,10 @@ extern "C" {
...
@@ -27,16 +27,10 @@ extern "C" {
#include "transport.h"
#include "transport.h"
#include "common.h"
#include "common.h"
#include "taosmsg.h"
#include "taosmsg.h"
#include "query.h"
struct
SCatalog
;
struct
SCatalog
;
typedef
struct
SDBVgroupInfo
{
int32_t
vgroupVersion
;
SArray
*
vgId
;
int32_t
hashRange
;
int32_t
hashNum
;
}
SDBVgroupInfo
;
typedef
struct
SCatalogReq
{
typedef
struct
SCatalogReq
{
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
//????
char
clusterId
[
TSDB_CLUSTER_ID_LEN
];
//????
SArray
*
pTableName
;
// table full name
SArray
*
pTableName
;
// table full name
...
...
include/libs/query/query.h
浏览文件 @
924e3ad6
...
@@ -20,6 +20,22 @@
...
@@ -20,6 +20,22 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
#include "tarray.h"
typedef
SVgroupListRspMsg
SVgroupListInfo
;
typedef
struct
SDBVgroupInfo
{
int32_t
vgroupVersion
;
SArray
*
vgId
;
int32_t
hashRange
;
}
SDBVgroupInfo
;
typedef
struct
SUseDbOutput
{
SVgroupListInfo
*
vgroupList
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
SDBVgroupInfo
*
dbVgroup
;
}
SUseDbOutput
;
extern
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
...
...
include/os/osMemory.h
浏览文件 @
924e3ad6
...
@@ -23,8 +23,8 @@ extern "C" {
...
@@ -23,8 +23,8 @@ extern "C" {
#define tfree(x) \
#define tfree(x) \
do { \
do { \
if (x) { \
if (x) { \
free((void *)
x
); \
free((void *)
(x)
); \
x
= 0; \
(x)
= 0; \
} \
} \
} while (0)
} while (0)
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
924e3ad6
...
@@ -26,6 +26,7 @@ extern "C" {
...
@@ -26,6 +26,7 @@ extern "C" {
#define CTG_DEFAULT_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CLUSTER_NUMBER 6
#define CTG_DEFAULT_VGROUP_NUMBER 100
#define CTG_DEFAULT_VGROUP_NUMBER 100
#define CTG_DEFAULT_DB_NUMBER 20
#define CTG_DEFAULT_INVALID_VERSION (-1)
#define CTG_DEFAULT_INVALID_VERSION (-1)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
924e3ad6
...
@@ -63,21 +63,69 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t*
...
@@ -63,21 +63,69 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t*
}
}
int32_t
ctgGetDBVgroupFromCache
(
SCatalog
*
pCatalog
,
char
*
dbName
,
SDBVgroupInfo
**
dbInfo
,
int32_t
*
exist
)
{
int32_t
ctgGetDBVgroupFromCache
(
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
**
dbInfo
,
int32_t
*
exist
)
{
/*
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
*
exist
=
0
;
*
exist
=
0
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
taosHashGet(SHashObj * pHashObj, const void * key, size_t keyLen)
SDBVgroupInfo
*
info
=
taosHashGet
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
if
(
NULL
==
info
||
info
->
vgroupVersion
<
pCatalog
->
vgroupCache
.
vgroupVersion
)
{
*
exist
=
0
;
return
TSDB_CODE_SUCCESS
;
}
if
(
dbInfo
)
{
if
(
dbInfo
)
{
*pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache);
*
dbInfo
=
calloc
(
1
,
sizeof
(
**
dbInfo
));
if
(
NULL
==
*
dbInfo
)
{
ctgError
(
"calloc size[%d] failed"
,
(
int32_t
)
sizeof
(
**
dbInfo
));
return
TSDB_CODE_CTG_MEM_ERROR
;
}
(
*
dbInfo
)
->
vgId
=
taosArrayDup
(
info
->
vgId
);
if
(
NULL
==
(
*
dbInfo
)
->
vgId
)
{
ctgError
(
"taos array duplicate failed"
);
tfree
(
*
dbInfo
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
(
*
dbInfo
)
->
vgroupVersion
=
info
->
vgroupVersion
;
(
*
dbInfo
)
->
hashRange
=
info
->
hashRange
;
}
}
*
exist
=
1
;
*
exist
=
1
;
*/
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetDBVgroupFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SBuildUseDBInput
*
input
,
SUseDbOutput
*
out
)
{
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
queryBuildMsg
[
TSDB_MSG_TYPE_USE_DB
](
input
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
return
code
;
}
SRpcMsg
rpcMsg
=
{
.
msgType
=
TSDB_MSG_TYPE_USE_DB
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
code
=
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
](
out
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
return
code
;
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -144,7 +192,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) {
...
@@ -144,7 +192,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) {
int32_t
catalogUpdateVgroup
(
struct
SCatalog
*
pCatalog
,
SVgroupListInfo
*
pVgroup
)
{
int32_t
catalogUpdateVgroup
(
struct
SCatalog
*
pCatalog
,
SVgroupListInfo
*
pVgroup
)
{
if
(
NULL
==
pVgroup
)
{
if
(
NULL
==
pVgroup
)
{
ctgError
(
"
vgroup get from mnode succeed, but no output
"
);
ctgError
(
"
no valid vgroup list info to update
"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
}
...
@@ -262,7 +310,33 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
...
@@ -262,7 +310,33 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
}
}
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
)
{
int32_t
catalogUpdateDBVgroup
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
dbInfo
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
if
(
dbInfo
->
vgroupVersion
<
0
)
{
if
(
pCatalog
->
dbCache
.
cache
)
{
taosHashRemove
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
}
ctgWarn
(
"remove db [%s] from cache"
,
dbName
);
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
pCatalog
->
dbCache
.
cache
=
taosHashInit
(
CTG_DEFAULT_DB_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
ctgError
(
"init hash[%d] for db cache failed"
,
CTG_DEFAULT_DB_NUMBER
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
}
if
(
taosHashPut
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
),
dbInfo
,
sizeof
(
*
dbInfo
))
!=
0
)
{
ctgError
(
"push to vgroup hash cache failed"
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -273,8 +347,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
...
@@ -273,8 +347,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
return
TSDB_CODE_CTG_INVALID_INPUT
;
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
}
/*
int32_t
exist
=
0
;
int32_t
exist
=
0
;
int32_t
code
=
0
;
if
(
0
==
forceUpdate
)
{
if
(
0
==
forceUpdate
)
{
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbInfo
,
&
exist
));
CTG_ERR_RET
(
ctgGetDBVgroupFromCache
(
pCatalog
,
dbName
,
dbInfo
,
&
exist
));
...
@@ -284,18 +358,34 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
...
@@ -284,18 +358,34 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
}
}
}
}
SDBVgroupInfo* newDbInfo = NULL;
SUseDbOutput
DbOut
=
{
0
};
SBuildUseDBInput
input
=
{
0
};
strncpy
(
input
.
db
,
dbName
,
sizeof
(
input
.
db
));
input
.
db
[
sizeof
(
input
.
db
)
-
1
]
=
0
;
input
.
vgroupVersion
=
pCatalog
->
vgroupCache
.
vgroupVersion
;
input
.
dbGroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps,
dbName, &newDbInfo
));
CTG_ERR_RET
(
ctgGetDBVgroupFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
input
,
&
DbOut
));
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, newDbInfo));
if
(
DbOut
.
vgroupList
)
{
CTG_ERR_JRET
(
catalogUpdateVgroup
(
pCatalog
,
DbOut
.
vgroupList
));
}
if
(
DbOut
.
dbVgroup
)
{
CTG_ERR_JRET
(
catalogUpdateDBVgroup
(
pCatalog
,
dbName
,
DbOut
.
dbVgroup
));
}
if
(
dbInfo
)
{
if
(
dbInfo
)
{
*dbInfo = newDbInfo;
*
dbInfo
=
DbOut
.
dbVgroup
;
DbOut
.
dbVgroup
=
NULL
;
}
}
*/
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
DbOut
.
dbVgroup
);
tfree
(
DbOut
.
vgroupList
);
return
code
;
}
}
...
...
source/libs/parser/CMakeLists.txt
浏览文件 @
924e3ad6
...
@@ -8,7 +8,7 @@ target_include_directories(
...
@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries
(
target_link_libraries
(
parser
parser
PRIVATE os util common catalog function transport
PRIVATE os util common catalog function transport
query
)
)
ADD_SUBDIRECTORY
(
test
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
source/libs/parser/test/CMakeLists.txt
浏览文件 @
924e3ad6
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE
(
parserTest
${
SOURCE_LIST
}
)
ADD_EXECUTABLE
(
parserTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
TARGET_LINK_LIBRARIES
(
parserTest
parserTest
PUBLIC os util common parser catalog transport gtest function planner
PUBLIC os util common parser catalog transport gtest function planner
query
)
)
TARGET_INCLUDE_DIRECTORIES
(
TARGET_INCLUDE_DIRECTORIES
(
...
...
source/libs/planner/CMakeLists.txt
浏览文件 @
924e3ad6
...
@@ -8,7 +8,7 @@ target_include_directories(
...
@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries
(
target_link_libraries
(
planner
planner
PRIVATE os util common catalog parser transport function
PRIVATE os util common catalog parser transport function
query
)
)
ADD_SUBDIRECTORY
(
test
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
source/libs/planner/test/CMakeLists.txt
浏览文件 @
924e3ad6
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
...
@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE
(
plannerTest
${
SOURCE_LIST
}
)
ADD_EXECUTABLE
(
plannerTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
TARGET_LINK_LIBRARIES
(
plannerTest
plannerTest
PUBLIC os util common planner parser catalog transport gtest function
PUBLIC os util common planner parser catalog transport gtest function
query
)
)
TARGET_INCLUDE_DIRECTORIES
(
TARGET_INCLUDE_DIRECTORIES
(
...
...
source/libs/query/src/querymsg.c
浏览文件 @
924e3ad6
...
@@ -15,7 +15,7 @@
...
@@ -15,7 +15,7 @@
#include "taosmsg.h"
#include "taosmsg.h"
#include "queryInt.h"
#include "queryInt.h"
#include "query.h"
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
=
{
0
};
int32_t
(
*
queryBuildMsg
[
TSDB_MSG_TYPE_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
=
{
0
};
...
@@ -60,6 +60,36 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
...
@@ -60,6 +60,36 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
queryBuildUseDbMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
input
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SBuildUseDBInput
*
bInput
=
(
SBuildUseDBInput
*
)
input
;
int32_t
estimateSize
=
sizeof
(
SUseDbMsg
);
if
(
NULL
==
*
msg
||
msgSize
<
estimateSize
)
{
tfree
(
*
msg
);
*
msg
=
calloc
(
1
,
estimateSize
);
if
(
NULL
==
*
msg
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
SUseDbMsg
*
bMsg
=
(
SUseDbMsg
*
)
*
msg
;
strncpy
(
bMsg
->
db
,
bInput
->
db
,
sizeof
(
bMsg
->
db
));
bMsg
->
db
[
sizeof
(
bMsg
->
db
)
-
1
]
=
0
;
bMsg
->
vgroupVersion
=
bInput
->
vgroupVersion
;
bMsg
->
dbGroupVersion
=
bInput
->
dbGroupVersion
;
*
msgLen
=
(
int32_t
)
sizeof
(
*
bMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessVgroupListRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
int32_t
queryProcessVgroupListRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
...
@@ -103,12 +133,126 @@ int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
...
@@ -103,12 +133,126 @@ int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SUseDbRspMsg
*
pRsp
=
(
SUseDbRspMsg
*
)
msg
;
SUseDbOutput
*
pOut
=
(
SUseDbOutput
*
)
output
;
int32_t
code
=
0
;
if
(
msgSize
<=
sizeof
(
*
pRsp
))
{
qError
(
"invalid use db rsp msg size, msgSize:%d"
,
msgSize
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
pRsp
->
vgroupVersion
=
htonl
(
pRsp
->
vgroupVersion
);
pRsp
->
dbVgroupVersion
=
htonl
(
pRsp
->
dbVgroupVersion
);
pRsp
->
vgroupNum
=
htonl
(
pRsp
->
vgroupNum
);
pRsp
->
dbVgroupNum
=
htonl
(
pRsp
->
dbVgroupNum
);
if
(
pRsp
->
vgroupNum
<
0
)
{
qError
(
"invalid vgroup number[%d]"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
if
(
pRsp
->
dbVgroupNum
<
0
)
{
qError
(
"invalid db vgroup number[%d]"
,
pRsp
->
dbVgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
int32_t
expectSize
=
pRsp
->
vgroupNum
*
sizeof
(
pRsp
->
vgroupInfo
[
0
])
+
pRsp
->
dbVgroupNum
*
sizeof
(
int32_t
)
+
sizeof
(
*
pRsp
);
if
(
msgSize
!=
expectSize
)
{
qError
(
"vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d"
,
msgSize
,
expectSize
,
pRsp
->
vgroupNum
,
pRsp
->
dbVgroupNum
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
if
(
pRsp
->
vgroupVersion
<
0
)
{
qInfo
(
"no new vgroup list info"
);
if
(
pRsp
->
vgroupNum
!=
0
)
{
qError
(
"invalid vgroup number[%d] for no new vgroup list case"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
}
else
{
int32_t
s
=
sizeof
(
*
pOut
->
vgroupList
)
+
sizeof
(
pOut
->
vgroupList
->
vgroupInfo
[
0
])
*
pRsp
->
vgroupNum
;
pOut
->
vgroupList
=
calloc
(
1
,
s
);
if
(
NULL
==
pOut
->
vgroupList
)
{
qError
(
"calloc size[%d] failed"
,
s
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pOut
->
vgroupList
->
vgroupNum
=
pRsp
->
vgroupNum
;
pOut
->
vgroupList
->
vgroupVersion
=
pRsp
->
vgroupVersion
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgroupNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
memcpy
(
&
pOut
->
vgroupList
->
vgroupInfo
[
i
],
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
]));
}
}
int32_t
*
vgIdList
=
(
int32_t
*
)((
char
*
)
pRsp
->
vgroupInfo
+
sizeof
(
pRsp
->
vgroupInfo
[
0
])
*
pRsp
->
vgroupNum
);
memcpy
(
pOut
->
db
,
pRsp
->
db
,
sizeof
(
pOut
->
db
));
if
(
pRsp
->
dbVgroupVersion
<
0
)
{
qInfo
(
"no new vgroup info for db[%s]"
,
pRsp
->
db
);
}
else
{
pOut
->
dbVgroup
=
calloc
(
1
,
sizeof
(
*
pOut
->
dbVgroup
));
if
(
NULL
==
pOut
->
dbVgroup
)
{
qError
(
"calloc size[%d] failed"
,
(
int32_t
)
sizeof
(
*
pOut
->
dbVgroup
));
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_exit
;
}
pOut
->
dbVgroup
->
vgId
=
taosArrayInit
(
pRsp
->
dbVgroupNum
,
sizeof
(
int32_t
));
if
(
NULL
==
pOut
->
dbVgroup
->
vgId
)
{
qError
(
"taosArrayInit size[%d] failed"
,
pRsp
->
dbVgroupNum
);
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_exit
;
}
pOut
->
dbVgroup
->
vgroupVersion
=
pRsp
->
dbVgroupVersion
;
pOut
->
dbVgroup
->
hashRange
=
htonl
(
pRsp
->
dbHashRange
);
for
(
int32_t
i
=
0
;
i
<
pRsp
->
dbVgroupNum
;
++
i
)
{
*
(
vgIdList
+
i
)
=
htonl
(
*
(
vgIdList
+
i
));
taosArrayPush
(
pOut
->
dbVgroup
->
vgId
,
vgIdList
+
i
)
;
}
}
return
code
;
_exit:
if
(
pOut
->
dbVgroup
&&
pOut
->
dbVgroup
->
vgId
)
{
taosArrayDestroy
(
pOut
->
dbVgroup
->
vgId
);
pOut
->
dbVgroup
->
vgId
=
NULL
;
}
tfree
(
pOut
->
dbVgroup
);
tfree
(
pOut
->
vgroupList
);
return
code
;
}
void
msgInit
()
{
void
msgInit
()
{
queryBuildMsg
[
TSDB_MSG_TYPE_TABLE_META
]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_TABLE_META
]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryBuildVgroupListReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryBuildVgroupListReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_USE_DB
]
=
queryBuildUseDbMsg
;
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryProcessVgroupListRsp
;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryProcessVgroupListRsp
;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
]
=
queryProcessUseDBRsp
;
/*
/*
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录