Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3b872546
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3b872546
编写于
12月 16, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change use db msg and remove vgroup cache
上级
62bbbe33
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
290 addition
and
420 deletion
+290
-420
include/common/taosmsg.h
include/common/taosmsg.h
+10
-13
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+1
-15
include/libs/query/query.h
include/libs/query/query.h
+6
-7
source/libs/catalog/CMakeLists.txt
source/libs/catalog/CMakeLists.txt
+2
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+73
-245
source/libs/catalog/test/CMakeLists.txt
source/libs/catalog/test/CMakeLists.txt
+18
-0
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+152
-0
source/libs/query/src/querymsg.c
source/libs/query/src/querymsg.c
+28
-140
未找到文件。
include/common/taosmsg.h
浏览文件 @
3b872546
...
...
@@ -221,8 +221,7 @@ typedef struct SBuildTableMetaInput {
typedef
struct
SBuildUseDBInput
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
vgroupVersion
;
int32_t
dbGroupVersion
;
int32_t
vgVersion
;
}
SBuildUseDBInput
;
...
...
@@ -627,8 +626,7 @@ typedef struct {
typedef
struct
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
int8_t
ignoreNotExists
;
int32_t
vgroupVersion
;
int32_t
dbGroupVersion
;
int32_t
vgVersion
;
int32_t
reserve
[
8
];
}
SUseDbMsg
;
...
...
@@ -808,6 +806,9 @@ typedef struct SSTableVgroupMsg {
typedef
struct
SVgroupInfo
{
int32_t
vgId
;
int32_t
hashBegin
;
int32_t
hashEnd
;
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddrMsg
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupInfo
;
...
...
@@ -863,16 +864,12 @@ typedef struct {
}
STagData
;
typedef
struct
{
int32_t
vgroupNum
;
int32_t
vgroupVersion
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
int32_t
dbVgroupVersion
;
int32_t
dbVgroupNum
;
int32_t
dbHashRange
;
int32_t
dbHashType
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int32_t
vgVersion
;
int32_t
vgNum
;
int8_t
hashMethod
;
SVgroupInfo
vgroupInfo
[];
//int32_t vgIdList[];
}
SUseDbRspMsg
;
}
SUseDbRsp
;
...
...
include/libs/catalog/catalog.h
浏览文件 @
3b872546
...
...
@@ -61,22 +61,8 @@ int32_t catalogInit(SCatalogCfg *cfg);
*/
int32_t
catalogGetHandle
(
const
char
*
clusterId
,
struct
SCatalog
**
catalogHandle
);
int32_t
catalogGetVgroupVersion
(
struct
SCatalog
*
pCatalog
,
int32_t
*
version
);
/**
* get cluster vgroup list.
* @pVgroupList - hash of vgroup list, key:vgId, value:SVgroupInfo
* @return
*/
int32_t
catalogGetVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SHashObj
**
pVgroupHash
);
int32_t
catalogUpdateVgroupCache
(
struct
SCatalog
*
pCatalog
,
SVgroupListInfo
*
pVgroup
);
int32_t
catalogGetDBVgroupVersion
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
int32_t
*
version
);
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
*
dbInfo
);
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
dbInfo
);
int32_t
catalogUpdateDBVgroupCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
);
...
...
include/libs/query/query.h
浏览文件 @
3b872546
...
...
@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "tarray.h"
#include "thash.h"
typedef
SVgroupListRspMsg
SVgroupListInfo
;
...
...
@@ -63,16 +64,14 @@ typedef struct STableMeta {
typedef
struct
SDBVgroupInfo
{
int32_t
vgroupVersion
;
SArray
*
vgId
;
int32_t
hashRange
;
int32_t
hashType
;
int32_t
vgVersion
;
int8_t
hashMethod
;
SHashObj
*
vgInfo
;
//key:vgId, value:SVgroupInfo
}
SDBVgroupInfo
;
typedef
struct
SUseDbOutput
{
SVgroupListInfo
*
vgroupList
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
SDBVgroupInfo
*
dbVgroup
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
SDBVgroupInfo
dbVgroup
;
}
SUseDbOutput
;
typedef
struct
STableMetaOutput
{
...
...
source/libs/catalog/CMakeLists.txt
浏览文件 @
3b872546
...
...
@@ -10,3 +10,5 @@ target_link_libraries(
catalog
PRIVATE os util common transport query
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
source/libs/catalog/src/catalog.c
浏览文件 @
3b872546
...
...
@@ -20,50 +20,7 @@
SCatalogMgmt
ctgMgmt
=
{
0
};
int32_t
ctgGetVgroupFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SVgroupListInfo
**
pVgroup
)
{
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
queryBuildMsg
[
TSDB_MSG_TYPE_VGROUP_LIST
](
NULL
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
return
code
;
}
SRpcMsg
rpcMsg
=
{
.
msgType
=
TSDB_MSG_TYPE_VGROUP_LIST
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
code
=
queryProcessMsgRsp
[
TSDB_MSG_TYPE_VGROUP_LIST
](
pVgroup
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
return
code
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetVgroupFromCache
(
struct
SCatalog
*
pCatalog
,
SHashObj
**
pVgroupList
,
int32_t
*
exist
)
{
if
(
NULL
==
pCatalog
->
vgroupCache
.
cache
||
pCatalog
->
vgroupCache
.
vgroupVersion
<
0
)
{
*
exist
=
0
;
return
TSDB_CODE_SUCCESS
;
}
if
(
pVgroupList
)
{
*
pVgroupList
=
pCatalog
->
vgroupCache
.
cache
;
}
*
exist
=
1
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetDBVgroupFromCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
**
dbInfo
,
int32_t
*
exist
)
{
int32_t
ctgGetDBVgroupFromCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
*
dbInfo
,
int32_t
*
exist
)
{
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
*
exist
=
0
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -71,28 +28,13 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
SDBVgroupInfo
*
info
=
taosHashGet
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
if
(
NULL
==
info
||
info
->
vgroupVersion
<
pCatalog
->
vgroupCache
.
vgroupVersion
)
{
if
(
NULL
==
info
)
{
*
exist
=
0
;
return
TSDB_CODE_SUCCESS
;
}
if
(
dbInfo
)
{
*
dbInfo
=
calloc
(
1
,
sizeof
(
**
dbInfo
));
if
(
NULL
==
*
dbInfo
)
{
ctgError
(
"calloc size[%d] failed"
,
(
int32_t
)
sizeof
(
**
dbInfo
));
return
TSDB_CODE_CTG_MEM_ERROR
;
}
(
*
dbInfo
)
->
vgId
=
taosArrayDup
(
info
->
vgId
);
if
(
NULL
==
(
*
dbInfo
)
->
vgId
)
{
ctgError
(
"taos array duplicate failed"
);
tfree
(
*
dbInfo
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
(
*
dbInfo
)
->
vgroupVersion
=
info
->
vgroupVersion
;
(
*
dbInfo
)
->
hashRange
=
info
->
hashRange
;
(
*
dbInfo
)
->
hashType
=
info
->
hashType
;
*
dbInfo
=
*
info
;
}
*
exist
=
1
;
...
...
@@ -242,8 +184,8 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
}
int32_t
ctgGetHashFunction
(
int
32_t
hashType
,
tableNameHashFp
*
fp
)
{
switch
(
hash
Type
)
{
int32_t
ctgGetHashFunction
(
int
8_t
hashMethod
,
tableNameHashFp
*
fp
)
{
switch
(
hash
Method
)
{
default:
*
fp
=
MurmurHash3_32
;
break
;
...
...
@@ -252,96 +194,79 @@ int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetVg
roupFromVgId
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
int32_t
vgId
,
SVgroupInfo
*
pVgroup
)
{
int32_t
ctgGetVg
InfoFromDB
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SDBVgroupInfo
*
dbInfo
,
SArray
*
vgroupList
)
{
SHashObj
*
vgroupHash
=
NULL
;
CTG_ERR_RET
(
catalogGetVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
vgroupHash
));
if
(
NULL
==
vgroupHash
)
{
ctgError
(
"get empty vgroup cache"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
SVgroupInfo
*
vgInfo
=
NULL
;
void
*
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
NULL
);
while
(
pIter
)
{
vgInfo
=
pIter
;
if
(
NULL
==
taosHashGetClone
(
vgroupHash
,
&
vgId
,
sizeof
(
vgId
),
pVgroup
))
{
ctgError
(
"vgId[%d] not found in vgroup list"
,
vgId
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
if
(
NULL
==
taosArrayPush
(
vgroupList
,
vgInfo
))
{
ctgError
(
"taosArrayPush failed"
);
break
;
}
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
pIter
);
vgInfo
=
NULL
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetVgroupFromVgIdBatch
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SArray
*
vgIds
,
SArray
*
vgroupList
)
{
SHashObj
*
vgroupHash
=
NULL
;
SVgroupInfo
pVgroup
=
{
0
};
int32_t
vgIdNum
=
taosArrayGetSize
(
vgIds
);
CTG_ERR_RET
(
catalogGetVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
vgroupHash
));
if
(
NULL
==
vgroupHash
)
{
ctgError
(
"get empty vgroup cache"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
int32_t
ctgGetVgInfoFromHashValue
(
SDBVgroupInfo
*
dbInfo
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
pVgroup
)
{
int32_t
vgNum
=
taosHashGetSize
(
dbInfo
->
vgInfo
);
if
(
vgNum
<=
0
)
{
ctgError
(
"db[%s] vgroup cache invalid, vgroup number:%d"
,
pDBName
,
vgNum
);
return
TSDB_CODE_TSC_DB_NOT_SELECTED
;
}
for
(
int32_t
i
=
0
;
i
<
vgIdNum
;
++
i
)
{
int32_t
*
vgId
=
taosArrayGet
(
vgIds
,
i
);
if
(
NULL
==
taosHashGetClone
(
vgroupHash
,
vgId
,
sizeof
(
*
vgId
),
&
pVgroup
))
{
ctgError
(
"vgId[%d] not found in vgroup list"
,
vgId
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
tableNameHashFp
fp
=
NULL
;
SVgroupInfo
*
vgInfo
=
NULL
;
if
(
NULL
==
taosArrayPush
(
vgroupList
,
&
pVgroup
))
{
ctgError
(
"push vgroup to array failed, idx:%d"
,
i
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
CTG_ERR_RET
(
ctgGetHashFunction
(
dbInfo
->
hashMethod
,
&
fp
));
char
tbFullName
[
TSDB_TABLE_FNAME_LEN
];
snprintf
(
tbFullName
,
sizeof
(
tbFullName
),
"%s.%s"
,
pDBName
,
pTableName
);
uint32_t
hashValue
=
(
*
fp
)(
tbFullName
,
(
uint32_t
)
strlen
(
tbFullName
));
void
*
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
NULL
);
while
(
pIter
)
{
vgInfo
=
pIter
;
if
(
hashValue
>=
vgInfo
->
hashBegin
&&
hashValue
<=
vgInfo
->
hashEnd
)
{
break
;
}
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
pIter
);
vgInfo
=
NULL
;
}
if
(
NULL
==
vgInfo
)
{
ctgError
(
"no hash range found for hashvalue[%u]"
,
hashValue
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
*
pVgroup
=
*
vgInfo
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTableHashVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
pVgroup
)
{
SDBVgroupInfo
*
dbInfo
=
NULL
;
SDBVgroupInfo
dbInfo
=
{
0
}
;
int32_t
code
=
0
;
int32_t
vgId
=
0
;
CTG_ERR_RET
(
catalogGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
false
,
&
dbInfo
));
if
(
NULL
==
db
Info
)
{
ctg
Warn
(
"db[%s] vgroup info not found"
,
pDBName
);
if
(
dbInfo
.
vgVersion
<
0
||
NULL
==
dbInfo
.
vg
Info
)
{
ctg
Error
(
"db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p"
,
pDBName
,
dbInfo
.
vgVersion
,
dbInfo
.
vgInfo
);
return
TSDB_CODE_TSC_DB_NOT_SELECTED
;
}
if
(
dbInfo
->
vgroupVersion
<
0
||
NULL
==
dbInfo
->
vgId
)
{
ctgError
(
"db[%s] vgroup cache invalid, vgroup version:%d, vgId:%p"
,
pDBName
,
dbInfo
->
vgroupVersion
,
dbInfo
->
vgId
);
CTG_ERR_JRET
(
TSDB_CODE_TSC_DB_NOT_SELECTED
);
}
int32_t
vgNum
=
taosArrayGetSize
(
dbInfo
->
vgId
);
if
(
vgNum
<=
0
)
{
ctgError
(
"db[%s] vgroup cache invalid, vgroup number:%d"
,
pDBName
,
vgNum
);
CTG_ERR_JRET
(
TSDB_CODE_TSC_DB_NOT_SELECTED
);
}
tableNameHashFp
fp
=
NULL
;
CTG_ERR_JRET
(
ctgGetHashFunction
(
dbInfo
->
hashType
,
&
fp
));
char
tbFullName
[
TSDB_TABLE_FNAME_LEN
];
snprintf
(
tbFullName
,
sizeof
(
tbFullName
),
"%s.%s"
,
pDBName
,
pTableName
);
uint32_t
hashValue
=
(
*
fp
)(
tbFullName
,
(
uint32_t
)
strlen
(
tbFullName
));
uint32_t
hashUnit
=
dbInfo
->
hashRange
/
vgNum
;
uint32_t
vgId
=
hashValue
/
hashUnit
;
CTG_ERR_JRET
(
ctgGetVgroupFromVgId
(
pCatalog
,
pRpc
,
pMgmtEps
,
vgId
,
pVgroup
));
_return:
if
(
dbInfo
&&
dbInfo
->
vgId
)
{
taosArrayDestroy
(
dbInfo
->
vgId
);
dbInfo
->
vgId
=
NULL
;
}
tfree
(
dbInfo
);
CTG_ERR_RET
(
ctgGetVgInfoFromHashValue
(
&
dbInfo
,
pDBName
,
pTableName
,
pVgroup
));
return
code
;
}
...
...
@@ -524,95 +449,6 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogGetVgroupVersion
(
struct
SCatalog
*
pCatalog
,
int32_t
*
version
)
{
if
(
NULL
==
pCatalog
||
NULL
==
version
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
*
version
=
pCatalog
->
vgroupCache
.
vgroupVersion
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogUpdateVgroupCache
(
struct
SCatalog
*
pCatalog
,
SVgroupListInfo
*
pVgroup
)
{
if
(
NULL
==
pVgroup
)
{
ctgError
(
"no valid vgroup list info to update"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
if
(
pVgroup
->
vgroupVersion
<
0
)
{
ctgError
(
"vgroup version[%d] is invalid"
,
pVgroup
->
vgroupVersion
);
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
if
(
NULL
==
pCatalog
->
vgroupCache
.
cache
)
{
pCatalog
->
vgroupCache
.
cache
=
taosHashInit
(
CTG_DEFAULT_CACHE_VGROUP_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
vgroupCache
.
cache
)
{
ctgError
(
"init hash[%d] for cluster cache failed"
,
CTG_DEFAULT_CACHE_VGROUP_NUMBER
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
}
else
{
taosHashClear
(
pCatalog
->
vgroupCache
.
cache
);
}
SVgroupInfo
*
vInfo
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
vgroupNum
;
++
i
)
{
if
(
taosHashPut
(
pCatalog
->
vgroupCache
.
cache
,
&
pVgroup
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pVgroup
->
vgroupInfo
[
i
].
vgId
),
&
pVgroup
->
vgroupInfo
[
i
],
sizeof
(
pVgroup
->
vgroupInfo
[
i
]))
!=
0
)
{
ctgError
(
"push to vgroup hash cache failed"
);
goto
error_exit
;
}
}
pCatalog
->
vgroupCache
.
vgroupVersion
=
pVgroup
->
vgroupVersion
;
return
TSDB_CODE_SUCCESS
;
error_exit:
if
(
pCatalog
->
vgroupCache
.
cache
)
{
taosHashCleanup
(
pCatalog
->
vgroupCache
.
cache
);
pCatalog
->
vgroupCache
.
cache
=
NULL
;
}
pCatalog
->
vgroupCache
.
vgroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
int32_t
catalogGetVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SHashObj
**
pVgroupHash
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pMgmtEps
||
NULL
==
pRpc
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
int32_t
exist
=
0
;
CTG_ERR_RET
(
ctgGetVgroupFromCache
(
pCatalog
,
pVgroupHash
,
&
exist
));
if
(
exist
)
{
return
TSDB_CODE_SUCCESS
;
}
SVgroupListInfo
*
pVgroup
=
NULL
;
CTG_ERR_RET
(
ctgGetVgroupFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
pVgroup
));
CTG_ERR_RET
(
catalogUpdateVgroupCache
(
pCatalog
,
pVgroup
));
if
(
pVgroupHash
)
{
CTG_ERR_RET
(
ctgGetVgroupFromCache
(
pCatalog
,
pVgroupHash
,
&
exist
));
}
if
(
0
==
exist
)
{
ctgError
(
"catalog fetched but get from cache failed"
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogGetDBVgroupVersion
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
int32_t
*
version
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
version
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
...
...
@@ -629,7 +465,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
return
TSDB_CODE_SUCCESS
;
}
*
version
=
dbInfo
->
vg
roup
Version
;
*
version
=
dbInfo
->
vgVersion
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -639,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
if
(
dbInfo
->
vg
roup
Version
<
0
)
{
if
(
dbInfo
->
vgVersion
<
0
)
{
if
(
pCatalog
->
dbCache
.
cache
)
{
taosHashRemove
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
}
...
...
@@ -654,6 +490,12 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
ctgError
(
"init hash[%d] for db cache failed"
,
CTG_DEFAULT_CACHE_DB_NUMBER
);
return
TSDB_CODE_CTG_MEM_ERROR
;
}
}
else
{
SDBVgroupInfo
*
oldInfo
=
taosHashGet
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
));
if
(
oldInfo
&&
oldInfo
->
vgInfo
)
{
taosHashCleanup
(
oldInfo
->
vgInfo
);
oldInfo
->
vgInfo
=
NULL
;
}
}
if
(
taosHashPut
(
pCatalog
->
dbCache
.
cache
,
dbName
,
strlen
(
dbName
),
dbInfo
,
sizeof
(
*
dbInfo
))
!=
0
)
{
...
...
@@ -667,7 +509,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
*
dbInfo
)
{
int32_t
catalogGetDBVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbName
,
int32_t
forceUpdate
,
SDBVgroupInfo
*
dbInfo
)
{
if
(
NULL
==
pCatalog
||
NULL
==
dbName
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
)
{
return
TSDB_CODE_CTG_INVALID_INPUT
;
}
...
...
@@ -688,28 +530,16 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
strncpy
(
input
.
db
,
dbName
,
sizeof
(
input
.
db
));
input
.
db
[
sizeof
(
input
.
db
)
-
1
]
=
0
;
input
.
vgroupVersion
=
pCatalog
->
vgroupCache
.
vgroupVersion
;
input
.
dbGroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
input
.
vgVersion
=
CTG_DEFAULT_INVALID_VERSION
;
CTG_ERR_RET
(
ctgGetDBVgroupFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
input
,
&
DbOut
));
if
(
DbOut
.
vgroupList
)
{
CTG_ERR_JRET
(
catalogUpdateVgroupCache
(
pCatalog
,
DbOut
.
vgroupList
));
}
if
(
DbOut
.
dbVgroup
)
{
CTG_ERR_JRET
(
catalogUpdateDBVgroupCache
(
pCatalog
,
dbName
,
DbOut
.
dbVgroup
));
}
CTG_ERR_RET
(
catalogUpdateDBVgroupCache
(
pCatalog
,
dbName
,
&
DbOut
.
dbVgroup
));
if
(
dbInfo
)
{
*
dbInfo
=
DbOut
.
dbVgroup
;
DbOut
.
dbVgroup
=
NULL
;
}
_return:
tfree
(
DbOut
.
dbVgroup
);
tfree
(
DbOut
.
vgroupList
);
return
code
;
}
...
...
@@ -749,16 +579,20 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe
STableMeta
*
tbMeta
=
NULL
;
int32_t
code
=
0
;
SVgroupInfo
vgroupInfo
=
{
0
};
SDBVgroupInfo
*
dbVgroup
=
NULL
;
SDBVgroupInfo
dbVgroup
=
{
0
}
;
CTG_ERR_JRET
(
catalogGetTableMeta
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
&
tbMeta
));
if
(
tbMeta
->
tableType
==
TSDB_SUPER_TABLE
)
{
CTG_ERR_JRET
(
catalogGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
false
,
&
dbVgroup
));
CTG_ERR_JRET
(
catalogGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
false
,
&
dbVgroup
));
CTG_ERR_JRET
(
ctgGetVgroupFromVgIdBatch
(
pCatalog
,
pRpc
,
pMgmtEps
,
dbVgroup
->
vgId
,
pVgroupList
));
if
(
tbMeta
->
tableType
==
TSDB_SUPER_TABLE
)
{
CTG_ERR_JRET
(
ctgGetVgInfoFromDB
(
pCatalog
,
pRpc
,
pMgmtEps
,
&
dbVgroup
,
pVgroupList
));
}
else
{
CTG_ERR_JRET
(
ctgGetVgroupFromVgId
(
pCatalog
,
pRpc
,
pMgmtEps
,
tbMeta
->
vgId
,
&
vgroupInfo
));
int32_t
vgId
=
tbMeta
->
vgId
;
if
(
NULL
==
taosHashGetClone
(
dbVgroup
.
vgInfo
,
&
vgId
,
sizeof
(
vgId
),
&
vgroupInfo
))
{
ctgError
(
"vgId[%d] not found in vgroup list"
,
vgId
);
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
if
(
NULL
==
taosArrayPush
(
pVgroupList
,
&
vgroupInfo
))
{
ctgError
(
"push vgroupInfo to array failed"
);
...
...
@@ -768,12 +602,6 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe
_return:
tfree
(
tbMeta
);
if
(
dbVgroup
&&
dbVgroup
->
vgId
)
{
taosArrayDestroy
(
dbVgroup
->
vgId
);
dbVgroup
->
vgId
=
NULL
;
}
tfree
(
dbVgroup
);
return
code
;
}
...
...
source/libs/catalog/test/CMakeLists.txt
0 → 100644
浏览文件 @
3b872546
MESSAGE
(
STATUS
"build catalog unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
catalogTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
catalogTest
PUBLIC os util common catalog transport gtest query
)
TARGET_INCLUDE_DIRECTORIES
(
catalogTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/catalog/"
PRIVATE
"
${
CMAKE_SOURCE_DIR
}
/source/libs/catalog/inc"
)
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
3b872546
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "catalog.h"
namespace
{
}
TEST
(
testCase
,
normalCase
)
{
char
*
clusterId
=
"cluster1"
;
struct
SCatalog
*
pCtg
=
NULL
;
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
clusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
}
/*
TEST(testCase, normalCase) {
SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMsgBuf buf;
buf.len = 128;
buf.buf = msg;
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.sub.node), 0);
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
SQueryStmtInfo* pQueryInfo = createQueryInfo();
setTableMetaInfo(pQueryInfo, &req);
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0);
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
ASSERT_EQ(ret, 0);
SArray* pExprList = pQueryInfo->exprList[0];
int32_t num = tsCompatibleModel? 2:1;
ASSERT_EQ(taosArrayGetSize(pExprList), num);
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
ASSERT_EQ(p1->base.pColumns->uid, 110);
ASSERT_EQ(p1->base.numOfParams, 1);
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP);
ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
ASSERT_EQ(p1->base.interBytes, 16);
ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
ASSERT_STREQ(p1->pExpr->_function.functionName, "top");
tExprNode* pParam = p1->pExpr->_function.pChild[0];
ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE);
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
struct SQueryPlanNode* n = nullptr;
code = createQueryPlan(pQueryInfo, &n);
char* str = NULL;
queryPlanToString(n, &str);
printf("%s\n", str);
destroyQueryInfo(pQueryInfo);
qParserClearupMetaRequestInfo(&req);
destroySqlInfo(&info1);
}
TEST(testCase, displayPlan) {
generateLogicplan("select count(*) from `t.1abc`");
generateLogicplan("select count(*)+ 22 from `t.1abc`");
generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30");
generateLogicplan("select count(*) from `t.1abc` group by a");
generateLogicplan("select count(A+B) from `t.1abc` group by a");
generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s)");
generateLogicplan("select count(*) from `t.1abc` interval(10s, 5s) sliding(7s) order by 1 desc ");
generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
// order by + group by column + limit offset
generateLogicplan("select top(a, 20) k from `t.1abc` order by k asc limit 3 offset 1");
// fill
generateLogicplan("select min(a) from `t.1abc` where ts>now and ts<now+2h interval(1s) fill(linear)");
// union + union all
// join
// Aggregate(count(*) [count(*) #5056], sum(a) [sum(a) #5057], avg(b) [avg(b) #5058], min(a+b) [min(a+b) #5060])
// Projection(cols: [a+b #5059]) filters:(nil)
// Projection(cols: [ts #0], [a #1], [b #2]) filters:(nil)
// TableScan(t.1abc #110) time_range: -9223372036854775808 - 9223372036854775807
}
*/
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
source/libs/query/src/querymsg.c
浏览文件 @
3b872546
...
...
@@ -21,17 +21,6 @@ int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msg
int32_t
(
*
queryProcessMsgRsp
[
TSDB_MSG_TYPE_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
=
{
0
};
int32_t
queryBuildVgroupListReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
*
msgLen
=
0
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildTableMetaReqMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
input
||
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
...
...
@@ -81,8 +70,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
strncpy
(
bMsg
->
db
,
bInput
->
db
,
sizeof
(
bMsg
->
db
));
bMsg
->
db
[
sizeof
(
bMsg
->
db
)
-
1
]
=
0
;
bMsg
->
vgroupVersion
=
bInput
->
vgroupVersion
;
bMsg
->
dbGroupVersion
=
bInput
->
dbGroupVersion
;
bMsg
->
vgVersion
=
bInput
->
vgVersion
;
*
msgLen
=
(
int32_t
)
sizeof
(
*
bMsg
);
...
...
@@ -90,58 +78,12 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
}
int32_t
queryProcessVgroupListRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SVgroupListRspMsg
*
pRsp
=
(
SVgroupListRspMsg
*
)
msg
;
pRsp
->
vgroupNum
=
htonl
(
pRsp
->
vgroupNum
);
pRsp
->
vgroupVersion
=
htonl
(
pRsp
->
vgroupVersion
);
if
(
pRsp
->
vgroupNum
<
0
)
{
qError
(
"vgroup number[%d] in rsp is invalid"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
if
(
pRsp
->
vgroupVersion
<
0
)
{
qError
(
"vgroup vgroupVersion[%d] in rsp is invalid"
,
pRsp
->
vgroupVersion
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
if
(
msgSize
!=
(
pRsp
->
vgroupNum
*
sizeof
(
pRsp
->
vgroupInfo
[
0
])
+
sizeof
(
*
pRsp
)))
{
qError
(
"vgroup list msg size mis-match, msgSize:%d, vgroup number:%d"
,
msgSize
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
// keep SVgroupListInfo/SVgroupListRspMsg the same
*
(
SVgroupListInfo
**
)
output
=
(
SVgroupListInfo
*
)
msg
;
if
(
pRsp
->
vgroupNum
==
0
)
{
return
TSDB_CODE_SUCCESS
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgroupNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SUseDbRsp
Msg
*
pRsp
=
(
SUseDbRspMsg
*
)
msg
;
SUseDbRsp
*
pRsp
=
(
SUseDbRsp
*
)
msg
;
SUseDbOutput
*
pOut
=
(
SUseDbOutput
*
)
output
;
int32_t
code
=
0
;
...
...
@@ -150,104 +92,52 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
pRsp
->
vgroupVersion
=
htonl
(
pRsp
->
vgroupVersion
);
pRsp
->
dbVgroupVersion
=
htonl
(
pRsp
->
dbVgroupVersion
);
pRsp
->
vgroupNum
=
htonl
(
pRsp
->
vgroupNum
);
pRsp
->
dbVgroupNum
=
htonl
(
pRsp
->
dbVgroupNum
);
if
(
pRsp
->
vgroupNum
<
0
)
{
qError
(
"invalid vgroup number[%d]"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
pRsp
->
vgVersion
=
htonl
(
pRsp
->
vgVersion
);
pRsp
->
vgNum
=
htonl
(
pRsp
->
vgNum
);
if
(
pRsp
->
dbVgroup
Num
<
0
)
{
qError
(
"invalid db
vgroup number[%d]"
,
pRsp
->
dbVgroup
Num
);
if
(
pRsp
->
vg
Num
<
0
)
{
qError
(
"invalid db
[%s] vgroup number[%d]"
,
pRsp
->
db
,
pRsp
->
vg
Num
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
int32_t
expectSize
=
pRsp
->
vg
roupNum
*
sizeof
(
pRsp
->
vgroupInfo
[
0
])
+
pRsp
->
dbVgroupNum
*
sizeof
(
int32_t
)
+
sizeof
(
*
pRsp
);
int32_t
expectSize
=
pRsp
->
vg
Num
*
sizeof
(
pRsp
->
vgroupInfo
[
0
]
)
+
sizeof
(
*
pRsp
);
if
(
msgSize
!=
expectSize
)
{
qError
(
"
vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d"
,
msgSize
,
expectSize
,
pRsp
->
vgroupNum
,
pRsp
->
dbVgroup
Num
);
qError
(
"
use db rsp size mis-match, msgSize:%d, expected:%d, vgnumber:%d"
,
msgSize
,
expectSize
,
pRsp
->
vg
Num
);
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
if
(
pRsp
->
vgroupVersion
<
0
)
{
qInfo
(
"no new vgroup list info"
);
if
(
pRsp
->
vgroupNum
!=
0
)
{
qError
(
"invalid vgroup number[%d] for no new vgroup list case"
,
pRsp
->
vgroupNum
);
return
TSDB_CODE_TSC_INVALID_VALUE
;
}
}
else
{
int32_t
s
=
sizeof
(
*
pOut
->
vgroupList
)
+
sizeof
(
pOut
->
vgroupList
->
vgroupInfo
[
0
])
*
pRsp
->
vgroupNum
;
pOut
->
vgroupList
=
calloc
(
1
,
s
);
if
(
NULL
==
pOut
->
vgroupList
)
{
qError
(
"calloc size[%d] failed"
,
s
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pOut
->
vgroupList
->
vgroupNum
=
pRsp
->
vgroupNum
;
pOut
->
vgroupList
->
vgroupVersion
=
pRsp
->
vgroupVersion
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgroupNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
memcpy
(
&
pOut
->
vgroupList
->
vgroupInfo
[
i
],
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
]));
}
pOut
->
dbVgroup
.
vgVersion
=
pRsp
->
vgVersion
;
pOut
->
dbVgroup
.
hashMethod
=
pRsp
->
hashMethod
;
pOut
->
dbVgroup
.
vgInfo
=
taosHashInit
(
pRsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pOut
->
dbVgroup
.
vgInfo
)
{
qError
(
"hash init[%d] failed"
,
pRsp
->
vgNum
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
int32_t
*
vgIdList
=
(
int32_t
*
)((
char
*
)
pRsp
->
vgroupInfo
+
sizeof
(
pRsp
->
vgroupInfo
[
0
])
*
pRsp
->
vgroupNum
);
memcpy
(
pOut
->
db
,
pRsp
->
db
,
sizeof
(
pOut
->
db
));
if
(
pRsp
->
dbVgroupVersion
<
0
)
{
qInfo
(
"no new vgroup info for db[%s]"
,
pRsp
->
db
);
}
else
{
pOut
->
dbVgroup
=
calloc
(
1
,
sizeof
(
*
pOut
->
dbVgroup
));
if
(
NULL
==
pOut
->
dbVgroup
)
{
qError
(
"calloc size[%d] failed"
,
(
int32_t
)
sizeof
(
*
pOut
->
dbVgroup
));
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_exit
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
pRsp
->
vgroupInfo
[
i
].
hashBegin
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
hashBegin
);
pRsp
->
vgroupInfo
[
i
].
hashEnd
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
hashEnd
);
pOut
->
dbVgroup
->
vgId
=
taosArrayInit
(
pRsp
->
dbVgroupNum
,
sizeof
(
int32_t
));
if
(
NULL
==
pOut
->
dbVgroup
->
vgId
)
{
qError
(
"taosArrayInit size[%d] failed"
,
pRsp
->
dbVgroupNum
);
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
goto
_exit
;
}
pOut
->
dbVgroup
->
vgroupVersion
=
pRsp
->
dbVgroupVersion
;
pOut
->
dbVgroup
->
hashRange
=
htonl
(
pRsp
->
dbHashRange
);
pOut
->
dbVgroup
->
hashType
=
htonl
(
pRsp
->
dbHashType
);
if
(
pOut
->
dbVgroup
->
hashRange
<
0
)
{
qError
(
"invalid hashRange[%d] for db[%s]"
,
pOut
->
dbVgroup
->
hashRange
,
pRsp
->
db
);
code
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_exit
;
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
htonl
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
dbVgroupNum
;
++
i
)
{
*
(
vgIdList
+
i
)
=
htonl
(
*
(
vgIdList
+
i
));
taosArrayPush
(
pOut
->
dbVgroup
->
vgId
,
vgIdList
+
i
)
;
if
(
0
!=
taosHashPut
(
pOut
->
dbVgroup
.
vgInfo
,
&
pRsp
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pRsp
->
vgroupInfo
[
i
].
vgId
),
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
])))
{
qError
(
"hash push failed"
);
goto
_return
;
}
}
memcpy
(
pOut
->
db
,
pRsp
->
db
,
sizeof
(
pOut
->
db
));
return
code
;
_exit:
if
(
pOut
->
dbVgroup
&&
pOut
->
dbVgroup
->
vgId
)
{
taosArrayDestroy
(
pOut
->
dbVgroup
->
vgId
);
pOut
->
dbVgroup
->
vgId
=
NULL
;
_return:
if
(
pOut
)
{
tfree
(
pOut
->
dbVgroup
.
vgInfo
);
}
tfree
(
pOut
->
dbVgroup
);
tfree
(
pOut
->
vgroupList
);
return
code
;
}
...
...
@@ -375,11 +265,9 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
void
msgInit
()
{
queryBuildMsg
[
TSDB_MSG_TYPE_TABLE_META
]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryBuildVgroupListReqMsg
;
queryBuildMsg
[
TSDB_MSG_TYPE_USE_DB
]
=
queryBuildUseDbMsg
;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_TABLE_META
]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_VGROUP_LIST
]
=
queryProcessVgroupListRsp
;
queryProcessMsgRsp
[
TSDB_MSG_TYPE_USE_DB
]
=
queryProcessUseDBRsp
;
/*
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录