Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a5e9b14d
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a5e9b14d
编写于
5月 06, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/qnode' of github.com:taosdata/TDengine into feature/qnode
上级
d12532b4
9e65741b
变更
18
显示空白变更内容
内联
并排
Showing
18 changed file
with
732 addition
and
226 deletion
+732
-226
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+1
-0
include/os/osSocket.h
include/os/osSocket.h
+1
-0
source/dnode/mgmt/implement/src/dmTransport.c
source/dnode/mgmt/implement/src/dmTransport.c
+7
-5
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/inc/mndSma.h
source/dnode/mnode/impl/inc/mndSma.h
+0
-1
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+1
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+60
-159
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+48
-1
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+2
-2
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+10
-2
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+114
-52
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+13
-1
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+4
-1
source/os/src/osSocket.c
source/os/src/osSocket.c
+42
-0
tests/system-test/1-insert/insertWithMoreVgroup.py
tests/system-test/1-insert/insertWithMoreVgroup.py
+291
-0
tests/system-test/2-query/smaTest.py
tests/system-test/2-query/smaTest.py
+131
-0
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+3
-0
tools/shell/src/shellNettest.c
tools/shell/src/shellNettest.c
+3
-2
未找到文件。
include/libs/transport/trpc.h
浏览文件 @
a5e9b14d
...
@@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha
...
@@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha
typedef
bool
(
*
RpcRfp
)(
int32_t
code
);
typedef
bool
(
*
RpcRfp
)(
int32_t
code
);
typedef
struct
SRpcInit
{
typedef
struct
SRpcInit
{
char
localFqdn
[
TSDB_FQDN_LEN
];
uint16_t
localPort
;
// local port
uint16_t
localPort
;
// local port
char
*
label
;
// for debug purpose
char
*
label
;
// for debug purpose
int
numOfThreads
;
// number of threads to handle connections
int
numOfThreads
;
// number of threads to handle connections
...
...
include/os/osSocket.h
浏览文件 @
a5e9b14d
...
@@ -161,6 +161,7 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec);
...
@@ -161,6 +161,7 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec);
TdSocketPtr
taosOpenUdpSocket
(
uint32_t
localIp
,
uint16_t
localPort
);
TdSocketPtr
taosOpenUdpSocket
(
uint32_t
localIp
,
uint16_t
localPort
);
TdSocketPtr
taosOpenTcpClientSocket
(
uint32_t
ip
,
uint16_t
port
,
uint32_t
localIp
);
TdSocketPtr
taosOpenTcpClientSocket
(
uint32_t
ip
,
uint16_t
port
,
uint32_t
localIp
);
bool
taosValidIpAndPort
(
uint32_t
ip
,
uint16_t
port
);
TdSocketServerPtr
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
);
TdSocketServerPtr
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
);
int32_t
taosKeepTcpAlive
(
TdSocketPtr
pSocket
);
int32_t
taosKeepTcpAlive
(
TdSocketPtr
pSocket
);
TdSocketPtr
taosAcceptTcpConnectSocket
(
TdSocketServerPtr
pServerSocket
,
struct
sockaddr
*
destAddr
,
int
*
addrLen
);
TdSocketPtr
taosAcceptTcpConnectSocket
(
TdSocketServerPtr
pServerSocket
,
struct
sockaddr
*
destAddr
,
int
*
addrLen
);
...
...
source/dnode/mgmt/implement/src/dmTransport.c
浏览文件 @
a5e9b14d
...
@@ -130,10 +130,10 @@ _OVER:
...
@@ -130,10 +130,10 @@ _OVER:
}
}
static
void
dmProcessMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
static
void
dmProcessMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
tmsg_t
msgType
=
pMsg
->
msgType
;
tmsg_t
msgType
=
pMsg
->
msgType
;
bool
isReq
=
msgType
&
1u
;
bool
isReq
=
msgType
&
1u
;
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMsgHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
msgType
)];
SMgmtWrapper
*
pWrapper
=
pHandle
->
pNdWrapper
;
SMgmtWrapper
*
pWrapper
=
pHandle
->
pNdWrapper
;
if
(
msgType
==
TDMT_DND_SERVER_STATUS
)
{
if
(
msgType
==
TDMT_DND_SERVER_STATUS
)
{
...
@@ -517,7 +517,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
...
@@ -517,7 +517,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq
authReq
=
{
0
};
SAuthReq
authReq
=
{
0
};
tstrncpy
(
authReq
.
user
,
user
,
TSDB_USER_LEN
);
tstrncpy
(
authReq
.
user
,
user
,
TSDB_USER_LEN
);
int32_t
contLen
=
tSerializeSAuthReq
(
NULL
,
0
,
&
authReq
);
int32_t
contLen
=
tSerializeSAuthReq
(
NULL
,
0
,
&
authReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
tSerializeSAuthReq
(
pReq
,
contLen
,
&
authReq
);
tSerializeSAuthReq
(
pReq
,
contLen
,
&
authReq
);
SRpcMsg
rpcMsg
=
{.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_AUTH
,
.
ahandle
=
(
void
*
)
9528
};
SRpcMsg
rpcMsg
=
{.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_AUTH
,
.
ahandle
=
(
void
*
)
9528
};
...
@@ -547,6 +547,8 @@ static int32_t dmInitServer(SDnode *pDnode) {
...
@@ -547,6 +547,8 @@ static int32_t dmInitServer(SDnode *pDnode) {
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SRpcInit
rpcInit
=
{
0
};
SRpcInit
rpcInit
=
{
0
};
strncpy
(
rpcInit
.
localFqdn
,
pDnode
->
data
.
localFqdn
,
strlen
(
pDnode
->
data
.
localFqdn
));
rpcInit
.
localPort
=
pDnode
->
data
.
serverPort
;
rpcInit
.
localPort
=
pDnode
->
data
.
serverPort
;
rpcInit
.
label
=
"DND"
;
rpcInit
.
label
=
"DND"
;
rpcInit
.
numOfThreads
=
tsNumOfRpcThreads
;
rpcInit
.
numOfThreads
=
tsNumOfRpcThreads
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
a5e9b14d
...
@@ -258,6 +258,7 @@ typedef struct {
...
@@ -258,6 +258,7 @@ typedef struct {
int32_t
authVersion
;
int32_t
authVersion
;
SHashObj
*
readDbs
;
SHashObj
*
readDbs
;
SHashObj
*
writeDbs
;
SHashObj
*
writeDbs
;
SRWLatch
lock
;
}
SUserObj
;
}
SUserObj
;
typedef
struct
{
typedef
struct
{
...
...
source/dnode/mnode/impl/inc/mndSma.h
浏览文件 @
a5e9b14d
...
@@ -26,7 +26,6 @@ int32_t mndInitSma(SMnode *pMnode);
...
@@ -26,7 +26,6 @@ int32_t mndInitSma(SMnode *pMnode);
void
mndCleanupSma
(
SMnode
*
pMnode
);
void
mndCleanupSma
(
SMnode
*
pMnode
);
SSmaObj
*
mndAcquireSma
(
SMnode
*
pMnode
,
char
*
smaName
);
SSmaObj
*
mndAcquireSma
(
SMnode
*
pMnode
,
char
*
smaName
);
void
mndReleaseSma
(
SMnode
*
pMnode
,
SSmaObj
*
pSma
);
void
mndReleaseSma
(
SMnode
*
pMnode
,
SSmaObj
*
pSma
);
int32_t
mndProcessGetSmaReq
(
SMnode
*
pMnode
,
SUserIndexReq
*
indexReq
,
SUserIndexRsp
*
rsp
,
bool
*
exist
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
a5e9b14d
...
@@ -33,6 +33,7 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
...
@@ -33,6 +33,7 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildAlterVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
a5e9b14d
...
@@ -44,16 +44,17 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
...
@@ -44,16 +44,17 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
static
int32_t
mndRetrieveDbs
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
);
static
int32_t
mndRetrieveDbs
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
);
static
void
mndCancelGetNextDb
(
SMnode
*
pMnode
,
void
*
pIter
);
static
void
mndCancelGetNextDb
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessGetDbCfgReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessGetDbCfgReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessGetIndexReq
(
SNodeMsg
*
pReq
);
int32_t
mndInitDb
(
SMnode
*
pMnode
)
{
int32_t
mndInitDb
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_DB
,
SSdbTable
table
=
{
.
sdbType
=
SDB_DB
,
.
keyType
=
SDB_KEY_BINARY
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndDbActionEncode
,
.
encodeFp
=
(
SdbEncodeFp
)
mndDbActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndDbActionDecode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndDbActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndDbActionInsert
,
.
insertFp
=
(
SdbInsertFp
)
mndDbActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndDbActionUpdate
,
.
updateFp
=
(
SdbUpdateFp
)
mndDbActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndDbActionDelete
};
.
deleteFp
=
(
SdbDeleteFp
)
mndDbActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_DB
,
mndProcessCreateDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_DB
,
mndProcessCreateDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_ALTER_DB
,
mndProcessAlterDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_ALTER_DB
,
mndProcessAlterDbReq
);
...
@@ -61,7 +62,6 @@ int32_t mndInitDb(SMnode *pMnode) {
...
@@ -61,7 +62,6 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_USE_DB
,
mndProcessUseDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_USE_DB
,
mndProcessUseDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_COMPACT_DB
,
mndProcessCompactDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_COMPACT_DB
,
mndProcessCompactDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_DB_CFG
,
mndProcessGetDbCfgReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_DB_CFG
,
mndProcessGetDbCfgReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_INDEX
,
mndProcessGetIndexReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_DB
,
mndRetrieveDbs
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_DB
,
mndRetrieveDbs
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_DB
,
mndCancelGetNextDb
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_DB
,
mndCancelGetNextDb
);
...
@@ -194,6 +194,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
...
@@ -194,6 +194,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
}
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
DB_RESERVE_SIZE
,
_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
DB_RESERVE_SIZE
,
_OVER
)
taosInitRWLatch
(
&
pDb
->
lock
);
terrno
=
0
;
terrno
=
0
;
...
@@ -222,17 +223,29 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
...
@@ -222,17 +223,29 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
static
int32_t
mndDbActionUpdate
(
SSdb
*
pSdb
,
SDbObj
*
pOld
,
SDbObj
*
pNew
)
{
static
int32_t
mndDbActionUpdate
(
SSdb
*
pSdb
,
SDbObj
*
pOld
,
SDbObj
*
pNew
)
{
mTrace
(
"db:%s, perform update action, old row:%p new row:%p"
,
pOld
->
name
,
pOld
,
pNew
);
mTrace
(
"db:%s, perform update action, old row:%p new row:%p"
,
pOld
->
name
,
pOld
,
pNew
);
taosWLockLatch
(
&
pOld
->
lock
);
taosWLockLatch
(
&
pOld
->
lock
);
SArray
*
pOldRetensions
=
pOld
->
cfg
.
pRetensions
;
pOld
->
updateTime
=
pNew
->
updateTime
;
pOld
->
updateTime
=
pNew
->
updateTime
;
pOld
->
cfgVersion
=
pNew
->
cfgVersion
;
pOld
->
cfgVersion
=
pNew
->
cfgVersion
;
pOld
->
vgVersion
=
pNew
->
vgVersion
;
pOld
->
vgVersion
=
pNew
->
vgVersion
;
memcpy
(
&
pOld
->
cfg
,
&
pNew
->
cfg
,
sizeof
(
SDbCfg
));
pOld
->
cfg
.
buffer
=
pNew
->
cfg
.
buffer
;
pNew
->
cfg
.
pRetensions
=
pOldRetensions
;
pOld
->
cfg
.
pages
=
pNew
->
cfg
.
pages
;
pOld
->
cfg
.
pageSize
=
pNew
->
cfg
.
pageSize
;
pOld
->
cfg
.
daysPerFile
=
pNew
->
cfg
.
daysPerFile
;
pOld
->
cfg
.
daysToKeep0
=
pNew
->
cfg
.
daysToKeep0
;
pOld
->
cfg
.
daysToKeep1
=
pNew
->
cfg
.
daysToKeep1
;
pOld
->
cfg
.
daysToKeep2
=
pNew
->
cfg
.
daysToKeep2
;
pOld
->
cfg
.
fsyncPeriod
=
pNew
->
cfg
.
fsyncPeriod
;
pOld
->
cfg
.
walLevel
=
pNew
->
cfg
.
walLevel
;
pOld
->
cfg
.
strict
=
pNew
->
cfg
.
strict
;
pOld
->
cfg
.
cacheLastRow
=
pNew
->
cfg
.
cacheLastRow
;
pOld
->
cfg
.
replications
=
pNew
->
cfg
.
replications
;
taosWUnLockLatch
(
&
pOld
->
lock
);
taosWUnLockLatch
(
&
pOld
->
lock
);
return
0
;
return
0
;
}
}
static
int32_t
mndGetGlobalVgroupVersion
(
SMnode
*
pMnode
)
{
return
sdbGetTableVer
(
pMnode
->
pSdb
,
SDB_VGROUP
);
}
static
inline
int32_t
mndGetGlobalVgroupVersion
(
SMnode
*
pMnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbGetTableVer
(
pSdb
,
SDB_VGROUP
);
}
SDbObj
*
mndAcquireDb
(
SMnode
*
pMnode
,
const
char
*
db
)
{
SDbObj
*
mndAcquireDb
(
SMnode
*
pMnode
,
const
char
*
db
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
@@ -638,69 +651,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
...
@@ -638,69 +651,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
return
0
;
return
0
;
}
}
void
*
mndBuildAlterVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
)
{
static
int32_t
mndBuildAlterVgroupAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SAlterVnodeReq
alterReq
=
{
0
};
alterReq
.
vgVersion
=
pVgroup
->
version
;
alterReq
.
buffer
=
pDb
->
cfg
.
buffer
;
alterReq
.
pages
=
pDb
->
cfg
.
pages
;
alterReq
.
pageSize
=
pDb
->
cfg
.
pageSize
;
alterReq
.
daysPerFile
=
pDb
->
cfg
.
daysPerFile
;
alterReq
.
daysToKeep0
=
pDb
->
cfg
.
daysToKeep0
;
alterReq
.
daysToKeep1
=
pDb
->
cfg
.
daysToKeep1
;
alterReq
.
daysToKeep2
=
pDb
->
cfg
.
daysToKeep2
;
alterReq
.
fsyncPeriod
=
pDb
->
cfg
.
fsyncPeriod
;
alterReq
.
walLevel
=
pDb
->
cfg
.
walLevel
;
alterReq
.
strict
=
pDb
->
cfg
.
strict
;
alterReq
.
cacheLastRow
=
pDb
->
cfg
.
cacheLastRow
;
alterReq
.
replica
=
pVgroup
->
replica
;
alterReq
.
selfIndex
=
-
1
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SReplica
*
pReplica
=
&
alterReq
.
replicas
[
v
];
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
v
];
SDnodeObj
*
pVgidDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pVgidDnode
==
NULL
)
{
return
NULL
;
}
pReplica
->
id
=
pVgidDnode
->
id
;
pReplica
->
port
=
pVgidDnode
->
port
;
memcpy
(
pReplica
->
fqdn
,
pVgidDnode
->
fqdn
,
TSDB_FQDN_LEN
);
mndReleaseDnode
(
pMnode
,
pVgidDnode
);
if
(
pDnode
->
id
==
pVgid
->
dnodeId
)
{
alterReq
.
selfIndex
=
v
;
}
}
if
(
alterReq
.
selfIndex
==
-
1
)
{
terrno
=
TSDB_CODE_MND_APP_ERROR
;
return
NULL
;
}
int32_t
contLen
=
tSerializeSAlterVnodeReq
(
NULL
,
0
,
&
alterReq
);
if
(
contLen
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
contLen
+=
+
sizeof
(
SMsgHead
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
if
(
pReq
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SMsgHead
*
pHead
=
pReq
;
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
tSerializeSAlterVnodeReq
((
char
*
)
pReq
+
sizeof
(
SMsgHead
),
contLen
,
&
alterReq
);
*
pContLen
=
contLen
;
return
pReq
;
}
static
int32_t
mndBuilAlterVgroupAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
STransAction
action
=
{
0
};
STransAction
action
=
{
0
};
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
...
@@ -736,7 +687,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
...
@@ -736,7 +687,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
==
pNew
->
uid
)
{
if
(
pVgroup
->
dbUid
==
pNew
->
uid
)
{
if
(
mndBuilAlterVgroupAction
(
pMnode
,
pTrans
,
pNew
,
pVgroup
)
!=
0
)
{
if
(
mndBuil
d
AlterVgroupAction
(
pMnode
,
pTrans
,
pNew
,
pVgroup
)
!=
0
)
{
sdbCancelFetch
(
pSdb
,
pIter
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
return
-
1
;
return
-
1
;
...
@@ -752,19 +703,19 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
...
@@ -752,19 +703,19 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static
int32_t
mndAlterDb
(
SMnode
*
pMnode
,
SNodeMsg
*
pReq
,
SDbObj
*
pOld
,
SDbObj
*
pNew
)
{
static
int32_t
mndAlterDb
(
SMnode
*
pMnode
,
SNodeMsg
*
pReq
,
SDbObj
*
pOld
,
SDbObj
*
pNew
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_ALTER_DB
,
&
pReq
->
rpcMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_ALTER_DB
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
goto
UPDATE_DB
_OVER
;
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to alter db:%s"
,
pTrans
->
id
,
pOld
->
name
);
mDebug
(
"trans:%d, used to alter db:%s"
,
pTrans
->
id
,
pOld
->
name
);
mndTransSetDbInfo
(
pTrans
,
pOld
);
mndTransSetDbInfo
(
pTrans
,
pOld
);
if
(
mndSetAlterDbRedoLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
UPDATE_DB
_OVER
;
if
(
mndSetAlterDbRedoLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterDbCommitLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
UPDATE_DB
_OVER
;
if
(
mndSetAlterDbCommitLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterDbRedoActions
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
UPDATE_DB
_OVER
;
if
(
mndSetAlterDbRedoActions
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
UPDATE_DB
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
code
=
0
;
UPDATE_DB
_OVER:
_OVER:
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
return
code
;
return
code
;
}
}
...
@@ -778,7 +729,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
...
@@ -778,7 +729,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
if
(
tDeserializeSAlterDbReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
alterReq
)
!=
0
)
{
if
(
tDeserializeSAlterDbReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
alterReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
ALTER_DB
_OVER
;
goto
_OVER
;
}
}
mDebug
(
"db:%s, start to alter"
,
alterReq
.
db
);
mDebug
(
"db:%s, start to alter"
,
alterReq
.
db
);
...
@@ -786,24 +737,26 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
...
@@ -786,24 +737,26 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
db
);
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
db
);
if
(
pDb
==
NULL
)
{
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
ALTER_DB
_OVER
;
goto
_OVER
;
}
}
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pUser
==
NULL
)
{
if
(
pUser
==
NULL
)
{
goto
ALTER_DB
_OVER
;
goto
_OVER
;
}
}
if
(
mndCheckAlterDropCompactDbAuth
(
pUser
,
pDb
)
!=
0
)
{
if
(
mndCheckAlterDropCompactDbAuth
(
pUser
,
pDb
)
!=
0
)
{
goto
ALTER_DB
_OVER
;
goto
_OVER
;
}
}
SDbObj
dbObj
=
{
0
};
SDbObj
dbObj
=
{
0
};
memcpy
(
&
dbObj
,
pDb
,
sizeof
(
SDbObj
));
memcpy
(
&
dbObj
,
pDb
,
sizeof
(
SDbObj
));
dbObj
.
cfg
.
numOfRetensions
=
0
;
dbObj
.
cfg
.
pRetensions
=
NULL
;
code
=
mndSetDbCfgFromAlterDbReq
(
&
dbObj
,
&
alterReq
);
code
=
mndSetDbCfgFromAlterDbReq
(
&
dbObj
,
&
alterReq
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
goto
ALTER_DB
_OVER
;
goto
_OVER
;
}
}
dbObj
.
cfgVersion
++
;
dbObj
.
cfgVersion
++
;
...
@@ -811,7 +764,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
...
@@ -811,7 +764,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
code
=
mndAlterDb
(
pMnode
,
pReq
,
pDb
,
&
dbObj
);
code
=
mndAlterDb
(
pMnode
,
pReq
,
pDb
,
&
dbObj
);
if
(
code
==
0
)
code
=
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
if
(
code
==
0
)
code
=
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
ALTER_DB
_OVER:
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to alter since %s"
,
alterReq
.
db
,
terrstr
());
mError
(
"db:%s, failed to alter since %s"
,
alterReq
.
db
,
terrstr
());
}
}
...
@@ -831,13 +784,13 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
...
@@ -831,13 +784,13 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
if
(
tDeserializeSDbCfgReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
cfgReq
)
!=
0
)
{
if
(
tDeserializeSDbCfgReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
cfgReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
GET_DB_CFG
_OVER
;
goto
_OVER
;
}
}
pDb
=
mndAcquireDb
(
pMnode
,
cfgReq
.
db
);
pDb
=
mndAcquireDb
(
pMnode
,
cfgReq
.
db
);
if
(
pDb
==
NULL
)
{
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
GET_DB_CFG
_OVER
;
goto
_OVER
;
}
}
cfgRsp
.
numOfVgroups
=
pDb
->
cfg
.
numOfVgroups
;
cfgRsp
.
numOfVgroups
=
pDb
->
cfg
.
numOfVgroups
;
...
@@ -866,7 +819,7 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
...
@@ -866,7 +819,7 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
if
(
pRsp
==
NULL
)
{
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
code
=
-
1
;
goto
GET_DB_CFG
_OVER
;
goto
_OVER
;
}
}
tSerializeSDbCfgRsp
(
pRsp
,
contLen
,
&
cfgRsp
);
tSerializeSDbCfgRsp
(
pRsp
,
contLen
,
&
cfgRsp
);
...
@@ -876,9 +829,9 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
...
@@ -876,9 +829,9 @@ static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) {
code
=
0
;
code
=
0
;
GET_DB_CFG
_OVER:
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
if
(
code
!=
0
)
{
mError
(
"db:%s, failed to get cfg since %s"
,
cfgReq
.
db
,
terrstr
());
mError
(
"db:%s, failed to get cfg since %s"
,
cfgReq
.
db
,
terrstr
());
}
}
...
@@ -1097,7 +1050,8 @@ _OVER:
...
@@ -1097,7 +1050,8 @@ _OVER:
return
code
;
return
code
;
}
}
void
mndGetDBTableNum
(
SDbObj
*
pDb
,
SMnode
*
pMnode
,
int32_t
*
num
)
{
static
int32_t
mndGetDBTableNum
(
SDbObj
*
pDb
,
SMnode
*
pMnode
)
{
int32_t
numOfTables
=
0
;
int32_t
vindex
=
0
;
int32_t
vindex
=
0
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
@@ -1108,8 +1062,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
...
@@ -1108,8 +1062,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
==
pDb
->
uid
)
{
if
(
pVgroup
->
dbUid
==
pDb
->
uid
)
{
*
num
+=
pVgroup
->
numOfTables
/
TSDB_TABLE_NUM_UNIT
;
numOfTables
+=
pVgroup
->
numOfTables
/
TSDB_TABLE_NUM_UNIT
;
vindex
++
;
vindex
++
;
}
}
...
@@ -1117,6 +1070,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
...
@@ -1117,6 +1070,7 @@ void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
}
}
sdbCancelFetch
(
pSdb
,
pIter
);
sdbCancelFetch
(
pSdb
,
pIter
);
return
numOfTables
;
}
}
static
void
mndBuildDBVgroupInfo
(
SDbObj
*
pDb
,
SMnode
*
pMnode
,
SArray
*
pVgList
)
{
static
void
mndBuildDBVgroupInfo
(
SDbObj
*
pDb
,
SMnode
*
pMnode
,
SArray
*
pVgList
)
{
...
@@ -1170,8 +1124,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
...
@@ -1170,8 +1124,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
return
-
1
;
return
-
1
;
}
}
int32_t
numOfTable
=
0
;
int32_t
numOfTable
=
mndGetDBTableNum
(
pDb
,
pMnode
);
mndGetDBTableNum
(
pDb
,
pMnode
,
&
numOfTable
);
if
(
pReq
==
NULL
||
pReq
->
vgVersion
<
pDb
->
vgVersion
||
pReq
->
dbId
!=
pDb
->
uid
||
numOfTable
!=
pReq
->
numOfTable
)
{
if
(
pReq
==
NULL
||
pReq
->
vgVersion
<
pDb
->
vgVersion
||
pReq
->
dbId
!=
pDb
->
uid
||
numOfTable
!=
pReq
->
numOfTable
)
{
mndBuildDBVgroupInfo
(
pDb
,
pMnode
,
pRsp
->
pVgroupInfos
);
mndBuildDBVgroupInfo
(
pDb
,
pMnode
,
pRsp
->
pVgroupInfos
);
...
@@ -1195,7 +1148,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
...
@@ -1195,7 +1148,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
if
(
tDeserializeSUseDbReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
usedbReq
)
!=
0
)
{
if
(
tDeserializeSUseDbReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
usedbReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
USE_DB
_OVER
;
goto
_OVER
;
}
}
char
*
p
=
strchr
(
usedbReq
.
db
,
'.'
);
char
*
p
=
strchr
(
usedbReq
.
db
,
'.'
);
...
@@ -1206,12 +1159,11 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
...
@@ -1206,12 +1159,11 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
usedbRsp
.
pVgroupInfos
=
taosArrayInit
(
10
,
sizeof
(
SVgroupInfo
));
usedbRsp
.
pVgroupInfos
=
taosArrayInit
(
10
,
sizeof
(
SVgroupInfo
));
if
(
usedbRsp
.
pVgroupInfos
==
NULL
)
{
if
(
usedbRsp
.
pVgroupInfos
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
USE_DB
_OVER
;
goto
_OVER
;
}
}
mndBuildDBVgroupInfo
(
NULL
,
pMnode
,
usedbRsp
.
pVgroupInfos
);
mndBuildDBVgroupInfo
(
NULL
,
pMnode
,
usedbRsp
.
pVgroupInfos
);
usedbRsp
.
vgVersion
=
vgVersion
++
;
usedbRsp
.
vgVersion
=
vgVersion
++
;
}
else
{
}
else
{
usedbRsp
.
vgVersion
=
usedbReq
.
vgVersion
;
usedbRsp
.
vgVersion
=
usedbReq
.
vgVersion
;
}
}
...
@@ -1232,15 +1184,15 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
...
@@ -1232,15 +1184,15 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
}
else
{
}
else
{
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
pUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
if
(
pUser
==
NULL
)
{
if
(
pUser
==
NULL
)
{
goto
USE_DB
_OVER
;
goto
_OVER
;
}
}
if
(
mndCheckUseDbAuth
(
pUser
,
pDb
)
!=
0
)
{
if
(
mndCheckUseDbAuth
(
pUser
,
pDb
)
!=
0
)
{
goto
USE_DB
_OVER
;
goto
_OVER
;
}
}
if
(
mndExtractDbInfo
(
pMnode
,
pDb
,
&
usedbRsp
,
&
usedbReq
)
<
0
)
{
if
(
mndExtractDbInfo
(
pMnode
,
pDb
,
&
usedbRsp
,
&
usedbReq
)
<
0
)
{
goto
USE_DB
_OVER
;
goto
_OVER
;
}
}
code
=
0
;
code
=
0
;
...
@@ -1252,7 +1204,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
...
@@ -1252,7 +1204,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
if
(
pRsp
==
NULL
)
{
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
code
=
-
1
;
goto
USE_DB
_OVER
;
goto
_OVER
;
}
}
tSerializeSUseDbRsp
(
pRsp
,
contLen
,
&
usedbRsp
);
tSerializeSUseDbRsp
(
pRsp
,
contLen
,
&
usedbRsp
);
...
@@ -1260,7 +1212,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
...
@@ -1260,7 +1212,7 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) {
pReq
->
pRsp
=
pRsp
;
pReq
->
pRsp
=
pRsp
;
pReq
->
rspLen
=
contLen
;
pReq
->
rspLen
=
contLen
;
USE_DB
_OVER:
_OVER:
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"db:%s, failed to process use db req since %s"
,
usedbReq
.
db
,
terrstr
());
mError
(
"db:%s, failed to process use db req since %s"
,
usedbReq
.
db
,
terrstr
());
}
}
...
@@ -1298,8 +1250,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
...
@@ -1298,8 +1250,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
continue
;
continue
;
}
}
int32_t
numOfTable
=
0
;
int32_t
numOfTable
=
mndGetDBTableNum
(
pDb
,
pMnode
);
mndGetDBTableNum
(
pDb
,
pMnode
,
&
numOfTable
);
if
(
pDbVgVersion
->
vgVersion
>=
pDb
->
vgVersion
&&
numOfTable
==
pDbVgVersion
->
numOfTable
)
{
if
(
pDbVgVersion
->
vgVersion
>=
pDb
->
vgVersion
&&
numOfTable
==
pDbVgVersion
->
numOfTable
)
{
mDebug
(
"db:%s, version & numOfTable not changed"
,
pDbVgVersion
->
dbFName
);
mDebug
(
"db:%s, version & numOfTable not changed"
,
pDbVgVersion
->
dbFName
);
...
@@ -1514,9 +1465,6 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
...
@@ -1514,9 +1465,6 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
b
,
false
);
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
b
,
false
);
}
}
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
}
}
static
void
setInformationSchemaDbCfg
(
SDbObj
*
pDbObj
)
{
static
void
setInformationSchemaDbCfg
(
SDbObj
*
pDbObj
)
{
...
@@ -1544,7 +1492,6 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) {
...
@@ -1544,7 +1492,6 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) {
static
bool
mndGetTablesOfDbFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
static
bool
mndGetTablesOfDbFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
SVgObj
*
pVgroup
=
pObj
;
SVgObj
*
pVgroup
=
pObj
;
int32_t
*
numOfTables
=
p1
;
int32_t
*
numOfTables
=
p1
;
*
numOfTables
+=
pVgroup
->
numOfTables
;
*
numOfTables
+=
pVgroup
->
numOfTables
;
return
true
;
return
true
;
}
}
...
@@ -1594,49 +1541,3 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) {
...
@@ -1594,49 +1541,3 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
sdbCancelFetch
(
pSdb
,
pIter
);
}
}
static
int32_t
mndProcessGetIndexReq
(
SNodeMsg
*
pReq
)
{
SUserIndexReq
indexReq
=
{
0
};
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
code
=
-
1
;
SUserIndexRsp
rsp
=
{
0
};
bool
exist
=
false
;
if
(
tDeserializeSUserIndexReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
indexReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
code
=
mndProcessGetSmaReq
(
pMnode
,
&
indexReq
,
&
rsp
,
&
exist
);
if
(
code
)
{
goto
_OVER
;
}
if
(
!
exist
)
{
// TODO GET INDEX FROM FULLTEXT
code
=
-
1
;
terrno
=
TSDB_CODE_MND_DB_INDEX_NOT_EXIST
;
}
else
{
int32_t
contLen
=
tSerializeSUserIndexRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
_OVER
;
}
tSerializeSUserIndexRsp
(
pRsp
,
contLen
,
&
rsp
);
pReq
->
pRsp
=
pRsp
;
pReq
->
rspLen
=
contLen
;
code
=
0
;
}
_OVER:
if
(
code
!=
0
)
{
mError
(
"failed to get index %s since %s"
,
indexReq
.
indexFName
,
terrstr
());
}
return
code
;
}
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
a5e9b14d
...
@@ -40,6 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq);
...
@@ -40,6 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq);
static
int32_t
mndProcessMDropSmaReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessMDropSmaReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessVCreateSmaRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndProcessVCreateSmaRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndProcessVDropSmaRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndProcessVDropSmaRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndProcessGetSmaReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndRetrieveSma
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
int32_t
mndRetrieveSma
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
);
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
);
...
@@ -56,6 +57,7 @@ int32_t mndInitSma(SMnode *pMnode) {
...
@@ -56,6 +57,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_SMA
,
mndProcessMDropSmaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_SMA
,
mndProcessMDropSmaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_SMA_RSP
,
mndProcessVCreateSmaRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_SMA_RSP
,
mndProcessVCreateSmaRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_SMA_RSP
,
mndProcessVDropSmaRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_SMA_RSP
,
mndProcessVDropSmaRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_INDEX
,
mndProcessGetSmaReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndRetrieveSma
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndRetrieveSma
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndCancelGetNextSma
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndCancelGetNextSma
);
...
@@ -686,7 +688,7 @@ _OVER:
...
@@ -686,7 +688,7 @@ _OVER:
return
code
;
return
code
;
}
}
int32_t
mndProcessGetSmaReq
(
SMnode
*
pMnode
,
SUserIndexReq
*
indexReq
,
SUserIndexRsp
*
rsp
,
bool
*
exist
)
{
static
int32_t
mndGetSma
(
SMnode
*
pMnode
,
SUserIndexReq
*
indexReq
,
SUserIndexRsp
*
rsp
,
bool
*
exist
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
SSmaObj
*
pSma
=
NULL
;
SSmaObj
*
pSma
=
NULL
;
...
@@ -715,6 +717,51 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexR
...
@@ -715,6 +717,51 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexR
}
}
mndReleaseSma
(
pMnode
,
pSma
);
mndReleaseSma
(
pMnode
,
pSma
);
return
code
;
}
static
int32_t
mndProcessGetSmaReq
(
SNodeMsg
*
pReq
)
{
SUserIndexReq
indexReq
=
{
0
};
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
code
=
-
1
;
SUserIndexRsp
rsp
=
{
0
};
bool
exist
=
false
;
if
(
tDeserializeSUserIndexReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
indexReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
code
=
mndGetSma
(
pMnode
,
&
indexReq
,
&
rsp
,
&
exist
);
if
(
code
)
{
goto
_OVER
;
}
if
(
!
exist
)
{
// TODO GET INDEX FROM FULLTEXT
code
=
-
1
;
terrno
=
TSDB_CODE_MND_DB_INDEX_NOT_EXIST
;
}
else
{
int32_t
contLen
=
tSerializeSUserIndexRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
_OVER
;
}
tSerializeSUserIndexRsp
(
pRsp
,
contLen
,
&
rsp
);
pReq
->
pRsp
=
pRsp
;
pReq
->
rspLen
=
contLen
;
code
=
0
;
}
_OVER:
if
(
code
!=
0
)
{
mError
(
"failed to get index %s since %s"
,
indexReq
.
indexFName
,
terrstr
());
}
return
code
;
return
code
;
}
}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
a5e9b14d
...
@@ -1050,7 +1050,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
...
@@ -1050,7 +1050,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
redoActions
);
int32_t
code
=
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
redoActions
);
if
(
code
!=
0
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"failed to execute redoActions since %s"
,
terrstr
());
mError
(
"failed to execute redoActions since %s"
,
terrstr
());
}
}
return
code
;
return
code
;
...
@@ -1058,7 +1058,7 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
...
@@ -1058,7 +1058,7 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
undoActions
);
int32_t
code
=
mndTransExecuteActions
(
pMnode
,
pTrans
,
pTrans
->
undoActions
);
if
(
code
!=
0
)
{
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"failed to execute undoActions since %s"
,
terrstr
());
mError
(
"failed to execute undoActions since %s"
,
terrstr
());
}
}
return
code
;
return
code
;
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
a5e9b14d
...
@@ -186,6 +186,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
...
@@ -186,6 +186,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
}
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
USER_RESERVE_SIZE
,
_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
USER_RESERVE_SIZE
,
_OVER
)
taosInitRWLatch
(
&
pUser
->
lock
);
terrno
=
0
;
terrno
=
0
;
...
@@ -228,11 +229,12 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
...
@@ -228,11 +229,12 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
static
int32_t
mndUserActionUpdate
(
SSdb
*
pSdb
,
SUserObj
*
pOld
,
SUserObj
*
pNew
)
{
static
int32_t
mndUserActionUpdate
(
SSdb
*
pSdb
,
SUserObj
*
pOld
,
SUserObj
*
pNew
)
{
mTrace
(
"user:%s, perform update action, old row:%p new row:%p"
,
pOld
->
user
,
pOld
,
pNew
);
mTrace
(
"user:%s, perform update action, old row:%p new row:%p"
,
pOld
->
user
,
pOld
,
pNew
);
memcpy
(
pOld
->
pass
,
pNew
->
pass
,
TSDB_PASSWORD_LEN
);
taosWLockLatch
(
&
pOld
->
lock
);
pOld
->
updateTime
=
pNew
->
updateTime
;
pOld
->
updateTime
=
pNew
->
updateTime
;
memcpy
(
pOld
->
pass
,
pNew
->
pass
,
TSDB_PASSWORD_LEN
);
TSWAP
(
pOld
->
readDbs
,
pNew
->
readDbs
);
TSWAP
(
pOld
->
readDbs
,
pNew
->
readDbs
);
TSWAP
(
pOld
->
writeDbs
,
pNew
->
writeDbs
);
TSWAP
(
pOld
->
writeDbs
,
pNew
->
writeDbs
);
taosWUnLockLatch
(
&
pOld
->
lock
);
return
0
;
return
0
;
}
}
...
@@ -426,8 +428,12 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
...
@@ -426,8 +428,12 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
}
}
memcpy
(
&
newUser
,
pUser
,
sizeof
(
SUserObj
));
memcpy
(
&
newUser
,
pUser
,
sizeof
(
SUserObj
));
taosRLockLatch
(
&
pUser
->
lock
);
newUser
.
readDbs
=
mndDupDbHash
(
pUser
->
readDbs
);
newUser
.
readDbs
=
mndDupDbHash
(
pUser
->
readDbs
);
newUser
.
writeDbs
=
mndDupDbHash
(
pUser
->
writeDbs
);
newUser
.
writeDbs
=
mndDupDbHash
(
pUser
->
writeDbs
);
taosRUnLockLatch
(
&
pUser
->
lock
);
if
(
newUser
.
readDbs
==
NULL
||
newUser
.
writeDbs
==
NULL
)
{
if
(
newUser
.
readDbs
==
NULL
||
newUser
.
writeDbs
==
NULL
)
{
goto
_OVER
;
goto
_OVER
;
}
}
...
@@ -586,8 +592,10 @@ static int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUser
...
@@ -586,8 +592,10 @@ static int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUser
memcpy
(
pRsp
->
user
,
pUser
->
user
,
TSDB_USER_LEN
);
memcpy
(
pRsp
->
user
,
pUser
->
user
,
TSDB_USER_LEN
);
pRsp
->
superAuth
=
pUser
->
superUser
;
pRsp
->
superAuth
=
pUser
->
superUser
;
pRsp
->
version
=
pUser
->
authVersion
;
pRsp
->
version
=
pUser
->
authVersion
;
taosRLockLatch
(
&
pUser
->
lock
);
pRsp
->
readDbs
=
mndDupDbHash
(
pUser
->
readDbs
);
pRsp
->
readDbs
=
mndDupDbHash
(
pUser
->
readDbs
);
pRsp
->
writeDbs
=
mndDupDbHash
(
pUser
->
writeDbs
);
pRsp
->
writeDbs
=
mndDupDbHash
(
pUser
->
writeDbs
);
taosRUnLockLatch
(
&
pUser
->
lock
);
pRsp
->
createdDbs
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pRsp
->
createdDbs
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
if
(
NULL
==
pRsp
->
createdDbs
)
{
if
(
NULL
==
pRsp
->
createdDbs
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
a5e9b14d
...
@@ -21,8 +21,8 @@
...
@@ -21,8 +21,8 @@
#include "mndShow.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndTrans.h"
#define
TSDB_
VGROUP_VER_NUMBER 1
#define VGROUP_VER_NUMBER 1
#define
TSDB_
VGROUP_RESERVE_SIZE 64
#define VGROUP_RESERVE_SIZE 64
static
SSdbRow
*
mndVgroupActionDecode
(
SSdbRaw
*
pRaw
);
static
SSdbRow
*
mndVgroupActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
...
@@ -34,19 +34,21 @@ static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp);
...
@@ -34,19 +34,21 @@ static int32_t mndProcessAlterVnodeRsp(SNodeMsg *pRsp);
static
int32_t
mndProcessDropVnodeRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndProcessDropVnodeRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndProcessCompactVnodeRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndProcessCompactVnodeRsp
(
SNodeMsg
*
pRsp
);
static
int32_t
mndRetrieveVgroups
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
int32_t
mndRetrieveVgroups
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextVgroup
(
SMnode
*
pMnode
,
void
*
pIter
);
static
void
mndCancelGetNextVgroup
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndRetrieveVnodes
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
int32_t
mndRetrieveVnodes
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextVnode
(
SMnode
*
pMnode
,
void
*
pIter
);
static
void
mndCancelGetNextVnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitVgroup
(
SMnode
*
pMnode
)
{
int32_t
mndInitVgroup
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_VGROUP
,
SSdbTable
table
=
{
.
sdbType
=
SDB_VGROUP
,
.
keyType
=
SDB_KEY_INT32
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndVgroupActionEncode
,
.
encodeFp
=
(
SdbEncodeFp
)
mndVgroupActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndVgroupActionDecode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndVgroupActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndVgroupActionInsert
,
.
insertFp
=
(
SdbInsertFp
)
mndVgroupActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndVgroupActionUpdate
,
.
updateFp
=
(
SdbUpdateFp
)
mndVgroupActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndVgroupActionDelete
};
.
deleteFp
=
(
SdbDeleteFp
)
mndVgroupActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_VNODE_RSP
,
mndProcessCreateVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_VNODE_RSP
,
mndProcessCreateVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_VNODE_RSP
,
mndProcessAlterVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_VNODE_RSP
,
mndProcessAlterVnodeRsp
);
...
@@ -66,29 +68,29 @@ void mndCleanupVgroup(SMnode *pMnode) {}
...
@@ -66,29 +68,29 @@ void mndCleanupVgroup(SMnode *pMnode) {}
SSdbRaw
*
mndVgroupActionEncode
(
SVgObj
*
pVgroup
)
{
SSdbRaw
*
mndVgroupActionEncode
(
SVgObj
*
pVgroup
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_VGROUP
,
TSDB_VGROUP_VER_NUMBER
,
sizeof
(
SVgObj
)
+
TSDB_
VGROUP_RESERVE_SIZE
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_VGROUP
,
VGROUP_VER_NUMBER
,
sizeof
(
SVgObj
)
+
VGROUP_RESERVE_SIZE
);
if
(
pRaw
==
NULL
)
goto
VG_ENCODE
_OVER
;
if
(
pRaw
==
NULL
)
goto
_OVER
;
int32_t
dataPos
=
0
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
vgId
,
VG_ENCODE
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
vgId
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
createdTime
,
VG_ENCODE
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
createdTime
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
updateTime
,
VG_ENCODE
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
updateTime
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
version
,
VG_ENCODE
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
version
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
hashBegin
,
VG_ENCODE
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
hashBegin
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
hashEnd
,
VG_ENCODE
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgroup
->
hashEnd
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pVgroup
->
dbName
,
TSDB_DB_FNAME_LEN
,
VG_ENCODE
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pVgroup
->
dbName
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
dbUid
,
VG_ENCODE
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pVgroup
->
dbUid
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pVgroup
->
replica
,
VG_ENCODE
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pVgroup
->
replica
,
_OVER
)
for
(
int8_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
for
(
int8_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgid
->
dnodeId
,
VG_ENCODE
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pVgid
->
dnodeId
,
_OVER
)
}
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_VGROUP_RESERVE_SIZE
,
VG_ENCODE
_OVER
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
VGROUP_RESERVE_SIZE
,
_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
VG_ENCODE
_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
_OVER
)
terrno
=
0
;
terrno
=
0
;
VG_ENCODE
_OVER:
_OVER:
if
(
terrno
!=
0
)
{
if
(
terrno
!=
0
)
{
mError
(
"vgId:%d, failed to encode to raw:%p since %s"
,
pVgroup
->
vgId
,
pRaw
,
terrstr
());
mError
(
"vgId:%d, failed to encode to raw:%p since %s"
,
pVgroup
->
vgId
,
pRaw
,
terrstr
());
sdbFreeRaw
(
pRaw
);
sdbFreeRaw
(
pRaw
);
...
@@ -103,41 +105,41 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
...
@@ -103,41 +105,41 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int8_t
sver
=
0
;
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
VG_DECODE
_OVER
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
_OVER
;
if
(
sver
!=
TSDB_
VGROUP_VER_NUMBER
)
{
if
(
sver
!=
VGROUP_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
goto
VG_DECODE
_OVER
;
goto
_OVER
;
}
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SVgObj
));
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SVgObj
));
if
(
pRow
==
NULL
)
goto
VG_DECODE
_OVER
;
if
(
pRow
==
NULL
)
goto
_OVER
;
SVgObj
*
pVgroup
=
sdbGetRowObj
(
pRow
);
SVgObj
*
pVgroup
=
sdbGetRowObj
(
pRow
);
if
(
pVgroup
==
NULL
)
goto
VG_DECODE
_OVER
;
if
(
pVgroup
==
NULL
)
goto
_OVER
;
int32_t
dataPos
=
0
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
vgId
,
VG_DECODE
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
vgId
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pVgroup
->
createdTime
,
VG_DECODE
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pVgroup
->
createdTime
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pVgroup
->
updateTime
,
VG_DECODE
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pVgroup
->
updateTime
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
version
,
VG_DECODE
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
version
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
hashBegin
,
VG_DECODE
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
hashBegin
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
hashEnd
,
VG_DECODE
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgroup
->
hashEnd
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pVgroup
->
dbName
,
TSDB_DB_FNAME_LEN
,
VG_DECODE
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pVgroup
->
dbName
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pVgroup
->
dbUid
,
VG_DECODE
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pVgroup
->
dbUid
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pVgroup
->
replica
,
VG_DECODE
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pVgroup
->
replica
,
_OVER
)
for
(
int8_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
for
(
int8_t
i
=
0
;
i
<
pVgroup
->
replica
;
++
i
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgid
->
dnodeId
,
VG_DECODE
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pVgid
->
dnodeId
,
_OVER
)
if
(
pVgroup
->
replica
==
1
)
{
if
(
pVgroup
->
replica
==
1
)
{
pVgid
->
role
=
TAOS_SYNC_STATE_LEADER
;
pVgid
->
role
=
TAOS_SYNC_STATE_LEADER
;
}
}
}
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
TSDB_VGROUP_RESERVE_SIZE
,
VG_DECODE
_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
VGROUP_RESERVE_SIZE
,
_OVER
)
terrno
=
0
;
terrno
=
0
;
VG_DECODE
_OVER:
_OVER:
if
(
terrno
!=
0
)
{
if
(
terrno
!=
0
)
{
mError
(
"vgId:%d, failed to decode from raw:%p since %s"
,
pVgroup
->
vgId
,
pRaw
,
terrstr
());
mError
(
"vgId:%d, failed to decode from raw:%p since %s"
,
pVgroup
->
vgId
,
pRaw
,
terrstr
());
taosMemoryFreeClear
(
pRow
);
taosMemoryFreeClear
(
pRow
);
...
@@ -254,6 +256,68 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
...
@@ -254,6 +256,68 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return
pReq
;
return
pReq
;
}
}
void
*
mndBuildAlterVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
)
{
SAlterVnodeReq
alterReq
=
{
0
};
alterReq
.
vgVersion
=
pVgroup
->
version
;
alterReq
.
buffer
=
pDb
->
cfg
.
buffer
;
alterReq
.
pages
=
pDb
->
cfg
.
pages
;
alterReq
.
pageSize
=
pDb
->
cfg
.
pageSize
;
alterReq
.
daysPerFile
=
pDb
->
cfg
.
daysPerFile
;
alterReq
.
daysToKeep0
=
pDb
->
cfg
.
daysToKeep0
;
alterReq
.
daysToKeep1
=
pDb
->
cfg
.
daysToKeep1
;
alterReq
.
daysToKeep2
=
pDb
->
cfg
.
daysToKeep2
;
alterReq
.
fsyncPeriod
=
pDb
->
cfg
.
fsyncPeriod
;
alterReq
.
walLevel
=
pDb
->
cfg
.
walLevel
;
alterReq
.
strict
=
pDb
->
cfg
.
strict
;
alterReq
.
cacheLastRow
=
pDb
->
cfg
.
cacheLastRow
;
alterReq
.
replica
=
pVgroup
->
replica
;
alterReq
.
selfIndex
=
-
1
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SReplica
*
pReplica
=
&
alterReq
.
replicas
[
v
];
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
v
];
SDnodeObj
*
pVgidDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pVgidDnode
==
NULL
)
{
return
NULL
;
}
pReplica
->
id
=
pVgidDnode
->
id
;
pReplica
->
port
=
pVgidDnode
->
port
;
memcpy
(
pReplica
->
fqdn
,
pVgidDnode
->
fqdn
,
TSDB_FQDN_LEN
);
mndReleaseDnode
(
pMnode
,
pVgidDnode
);
if
(
pDnode
->
id
==
pVgid
->
dnodeId
)
{
alterReq
.
selfIndex
=
v
;
}
}
if
(
alterReq
.
selfIndex
==
-
1
)
{
terrno
=
TSDB_CODE_MND_APP_ERROR
;
return
NULL
;
}
int32_t
contLen
=
tSerializeSAlterVnodeReq
(
NULL
,
0
,
&
alterReq
);
if
(
contLen
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
contLen
+=
+
sizeof
(
SMsgHead
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
if
(
pReq
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
SMsgHead
*
pHead
=
pReq
;
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
tSerializeSAlterVnodeReq
((
char
*
)
pReq
+
sizeof
(
SMsgHead
),
contLen
,
&
alterReq
);
*
pContLen
=
contLen
;
return
pReq
;
}
void
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
)
{
void
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
)
{
SDropVnodeReq
dropReq
=
{
0
};
SDropVnodeReq
dropReq
=
{
0
};
dropReq
.
dnodeId
=
pDnode
->
id
;
dropReq
.
dnodeId
=
pDnode
->
id
;
...
@@ -372,12 +436,12 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
...
@@ -372,12 +436,12 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
pVgroups
=
taosMemoryCalloc
(
pDb
->
cfg
.
numOfVgroups
,
sizeof
(
SVgObj
));
pVgroups
=
taosMemoryCalloc
(
pDb
->
cfg
.
numOfVgroups
,
sizeof
(
SVgObj
));
if
(
pVgroups
==
NULL
)
{
if
(
pVgroups
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
ALLOC_VGROUP
_OVER
;
goto
_OVER
;
}
}
pArray
=
mndBuildDnodesArray
(
pMnode
);
pArray
=
mndBuildDnodesArray
(
pMnode
);
if
(
pArray
==
NULL
)
{
if
(
pArray
==
NULL
)
{
goto
ALLOC_VGROUP
_OVER
;
goto
_OVER
;
}
}
mDebug
(
"db:%s, total %d dnodes used to create %d vgroups (%d vnodes)"
,
pDb
->
name
,
(
int32_t
)
taosArrayGetSize
(
pArray
),
mDebug
(
"db:%s, total %d dnodes used to create %d vgroups (%d vnodes)"
,
pDb
->
name
,
(
int32_t
)
taosArrayGetSize
(
pArray
),
...
@@ -410,7 +474,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
...
@@ -410,7 +474,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
if
(
mndGetAvailableDnode
(
pMnode
,
pVgroup
,
pArray
)
!=
0
)
{
if
(
mndGetAvailableDnode
(
pMnode
,
pVgroup
,
pArray
)
!=
0
)
{
terrno
=
TSDB_CODE_MND_NO_ENOUGH_DNODES
;
terrno
=
TSDB_CODE_MND_NO_ENOUGH_DNODES
;
goto
ALLOC_VGROUP
_OVER
;
goto
_OVER
;
}
}
allocedVgroups
++
;
allocedVgroups
++
;
...
@@ -421,7 +485,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
...
@@ -421,7 +485,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
mDebug
(
"db:%s, %d vgroups is alloced, replica:%d"
,
pDb
->
name
,
pDb
->
cfg
.
numOfVgroups
,
pDb
->
cfg
.
replications
);
mDebug
(
"db:%s, %d vgroups is alloced, replica:%d"
,
pDb
->
name
,
pDb
->
cfg
.
numOfVgroups
,
pDb
->
cfg
.
replications
);
ALLOC_VGROUP
_OVER:
_OVER:
if
(
code
!=
0
)
taosMemoryFree
(
pVgroups
);
if
(
code
!=
0
)
taosMemoryFree
(
pVgroups
);
taosArrayDestroy
(
pArray
);
taosArrayDestroy
(
pArray
);
return
code
;
return
code
;
...
@@ -492,7 +556,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
...
@@ -492,7 +556,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
return
0
;
return
0
;
}
}
static
int32_t
mndRetrieveVgroups
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
static
int32_t
mndRetrieveVgroups
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SMnode
*
pMnode
=
pReq
->
pNode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
int32_t
numOfRows
=
0
;
...
@@ -533,7 +597,6 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock*
...
@@ -533,7 +597,6 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock*
// default 3 replica
// default 3 replica
for
(
int32_t
i
=
0
;
i
<
3
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
3
;
++
i
)
{
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
if
(
i
<
pVgroup
->
replica
)
{
if
(
i
<
pVgroup
->
replica
)
{
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
vnodeGid
[
i
].
dnodeId
,
false
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
vnodeGid
[
i
].
dnodeId
,
false
);
...
@@ -597,13 +660,12 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
...
@@ -597,13 +660,12 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
return
numOfVnodes
;
return
numOfVnodes
;
}
}
static
int32_t
mndRetrieveVnodes
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
static
int32_t
mndRetrieveVnodes
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SMnode
*
pMnode
=
pReq
->
pNode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
int32_t
numOfRows
=
0
;
SVgObj
*
pVgroup
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
int32_t
cols
=
0
;
int32_t
cols
=
0
;
// int32_t dnodeId = pShow->replica;
while
(
numOfRows
<
rows
)
{
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pShow
->
pIter
,
(
void
**
)
&
pVgroup
);
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pShow
->
pIter
,
(
void
**
)
&
pVgroup
);
...
...
source/libs/transport/src/trans.c
浏览文件 @
a5e9b14d
...
@@ -46,9 +46,21 @@ void* rpcOpen(const SRpcInit* pInit) {
...
@@ -46,9 +46,21 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
}
}
uint32_t
ip
=
0
;
if
(
pInit
->
connType
==
TAOS_CONN_SERVER
)
{
ip
=
taosGetIpv4FromFqdn
(
pInit
->
localFqdn
);
if
(
ip
==
0xFFFFFFFF
)
{
tError
(
"invalid fqdn: %s"
,
pInit
->
localFqdn
);
terrno
=
TSDB_CODE_RPC_FQDN_ERROR
;
taosMemoryFree
(
pRpc
);
return
NULL
;
}
}
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
idleTime
=
pInit
->
idleTime
;
pRpc
->
idleTime
=
pInit
->
idleTime
;
pRpc
->
tcphandle
=
(
*
taosInitHandle
[
pRpc
->
connType
])(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
pRpc
->
tcphandle
=
(
*
taosInitHandle
[
pRpc
->
connType
])(
ip
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
if
(
pRpc
->
tcphandle
==
NULL
)
{
if
(
pRpc
->
tcphandle
==
NULL
)
{
taosMemoryFree
(
pRpc
);
taosMemoryFree
(
pRpc
);
return
NULL
;
return
NULL
;
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
a5e9b14d
...
@@ -817,7 +817,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
...
@@ -817,7 +817,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
taosMemoryCalloc
(
2
,
sizeof
(
uv_pipe_t
));
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
taosMemoryCalloc
(
2
,
sizeof
(
uv_pipe_t
));
uv_os_sock_t
fds
[
2
];
uv_os_sock_t
fds
[
2
];
if
(
uv_socketpair
(
SOCK_STREAM
,
0
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
if
(
uv_socketpair
(
SOCK_STREAM
,
0
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
goto
End
;
goto
End
;
...
@@ -841,6 +840,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
...
@@ -841,6 +840,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto
End
;
goto
End
;
}
}
}
}
if
(
false
==
taosValidIpAndPort
(
srv
->
ip
,
srv
->
port
))
{
tError
(
"failed to bind, reason: %s"
,
terrstr
());
goto
End
;
}
if
(
false
==
addHandleToAcceptloop
(
srv
))
{
if
(
false
==
addHandleToAcceptloop
(
srv
))
{
goto
End
;
goto
End
;
}
}
...
...
source/os/src/osSocket.c
浏览文件 @
a5e9b14d
...
@@ -638,6 +638,48 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
...
@@ -638,6 +638,48 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
return
0
;
return
0
;
}
}
bool
taosValidIpAndPort
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
serverAdd
;
SocketFd
fd
;
int32_t
reuse
;
// printf("open tcp server socket:0x%x:%hu", ip, port);
bzero
((
char
*
)
&
serverAdd
,
sizeof
(
serverAdd
));
serverAdd
.
sin_family
=
AF_INET
;
serverAdd
.
sin_addr
.
s_addr
=
ip
;
serverAdd
.
sin_port
=
(
uint16_t
)
htons
(
port
);
if
((
fd
=
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
))
<=
2
)
{
// printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck1
(
fd
);
return
false
;
}
TdSocketPtr
pSocket
=
(
TdSocketPtr
)
taosMemoryMalloc
(
sizeof
(
TdSocket
));
if
(
pSocket
==
NULL
)
{
taosCloseSocketNoCheck1
(
fd
);
return
false
;
}
pSocket
->
refId
=
0
;
pSocket
->
fd
=
fd
;
/* set REUSEADDR option, so the portnumber can be re-used */
reuse
=
1
;
if
(
taosSetSockOpt
(
pSocket
,
SOL_SOCKET
,
SO_REUSEADDR
,
(
void
*
)
&
reuse
,
sizeof
(
reuse
))
<
0
)
{
// printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket
(
&
pSocket
);
return
NULL
;
}
/* bind socket to server address */
if
(
bind
(
pSocket
->
fd
,
(
struct
sockaddr
*
)
&
serverAdd
,
sizeof
(
serverAdd
))
<
0
)
{
// printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket
(
&
pSocket
);
return
false
;
}
taosCloseSocket
(
&
pSocket
);
return
true
;
}
TdSocketServerPtr
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
)
{
TdSocketServerPtr
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
serverAdd
;
struct
sockaddr_in
serverAdd
;
SocketFd
fd
;
SocketFd
fd
;
...
...
tests/system-test/1-insert/insertWithMoreVgroup.py
0 → 100644
浏览文件 @
a5e9b14d
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
threading
import
multiprocessing
as
mp
from
numpy.lib.function_base
import
insert
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
import
numpy
as
np
import
datetime
as
dt
import
time
# constant define
WAITS
=
5
# wait seconds
class
TDTestCase
:
#
# --------------- main frame -------------------
#
def
caseDescription
(
self
):
'''
limit and offset keyword function test cases;
case1: limit offset base function test
case2: offset return valid
'''
return
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
# init
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
# tdSql.prepare()
# self.create_tables();
self
.
ts
=
1500000000000
# run case
def
run
(
self
):
# test base case
self
.
test_case1
()
tdLog
.
debug
(
" LIMIT test_case1 ............ [OK]"
)
# test advance case
# self.test_case2()
# tdLog.debug(" LIMIT test_case2 ............ [OK]")
# stop
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
# --------------- case -------------------
# create tables
def
create_tables
(
self
,
dbname
,
stbname
,
count
):
tdSql
.
execute
(
"use %s"
%
dbname
)
tdSql
.
execute
(
"create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"
%
stbname
)
pre_create
=
"create table"
sql
=
pre_create
tdLog
.
debug
(
"doing create one stable %s and %d child table in %s ..."
%
(
stbname
,
count
,
dbname
))
# print(time.time())
exeStartTime
=
time
.
time
()
for
i
in
range
(
count
):
sql
+=
" %s_%d using %s tags(%d)"
%
(
stbname
,
i
,
stbname
,
i
+
1
)
if
i
>
0
and
i
%
3000
==
0
:
tdSql
.
execute
(
sql
)
sql
=
pre_create
# print(time.time())
# end sql
if
sql
!=
pre_create
:
tdSql
.
execute
(
sql
)
exeEndTime
=
time
.
time
()
spendTime
=
exeEndTime
-
exeStartTime
speedCreate
=
count
/
spendTime
tdLog
.
debug
(
"spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"
%
(
spendTime
,
count
,
speedCreate
))
return
def
newcur
(
self
,
host
,
cfg
):
user
=
"root"
password
=
"taosdata"
port
=
6030
con
=
taos
.
connect
(
host
=
host
,
user
=
user
,
password
=
password
,
config
=
cfg
,
port
=
port
)
cur
=
con
.
cursor
()
print
(
cur
)
return
cur
def
new_create_tables
(
self
,
dbname
,
vgroups
,
stbname
,
tcountStart
,
tcountStop
):
host
=
"chenhaoran02"
buildPath
=
self
.
getBuildPath
()
config
=
buildPath
+
"../sim/dnode1/cfg/"
tsql
=
self
.
newcur
(
host
,
config
)
tsql
.
execute
(
"create database %s vgroups %d"
%
(
dbname
,
vgroups
))
tsql
.
execute
(
"use %s"
%
dbname
)
tsql
.
execute
(
"create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"
%
stbname
)
pre_create
=
"create table"
sql
=
pre_create
tcountStop
=
int
(
tcountStop
)
tcountStart
=
int
(
tcountStart
)
count
=
tcountStop
-
tcountStart
tdLog
.
debug
(
"doing create one stable %s and %d child table in %s ..."
%
(
stbname
,
count
,
dbname
))
# print(time.time())
exeStartTime
=
time
.
time
()
# print(type(tcountStop),type(tcountStart))
for
i
in
range
(
tcountStart
,
tcountStop
):
sql
+=
" %s_%d using %s tags(%d)"
%
(
stbname
,
i
,
stbname
,
i
+
1
)
if
i
>
0
and
i
%
20000
==
0
:
# print(sql)
tsql
.
execute
(
sql
)
sql
=
pre_create
# print(time.time())
# end sql
if
sql
!=
pre_create
:
# print(sql)
tsql
.
execute
(
sql
)
exeEndTime
=
time
.
time
()
spendTime
=
exeEndTime
-
exeStartTime
speedCreate
=
count
/
spendTime
# tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
return
# insert data
def
insert_data
(
self
,
dbname
,
stbname
,
ts_start
,
tcountStart
,
tcountStop
,
rowCount
):
tdSql
.
execute
(
"use %s"
%
dbname
)
pre_insert
=
"insert into "
sql
=
pre_insert
tcount
=
tcountStop
-
tcountStart
allRows
=
tcount
*
rowCount
tdLog
.
debug
(
"doing insert data into stable:%s rows:%d ..."
%
(
stbname
,
allRows
))
exeStartTime
=
time
.
time
()
for
i
in
range
(
tcountStart
,
tcountStop
):
sql
+=
" %s_%d values "
%
(
stbname
,
i
)
for
j
in
range
(
rowCount
):
sql
+=
"(%d, %d, 'taos_%d') "
%
(
ts_start
+
j
*
1000
,
j
,
j
)
if
j
>
0
and
j
%
5000
==
0
:
# print(sql)
tdSql
.
execute
(
sql
)
sql
=
"insert into %s_%d values "
%
(
stbname
,
i
)
# end sql
if
sql
!=
pre_insert
:
# print(sql)
tdSql
.
execute
(
sql
)
exeEndTime
=
time
.
time
()
spendTime
=
exeEndTime
-
exeStartTime
speedInsert
=
allRows
/
spendTime
# tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert))
tdLog
.
debug
(
"INSERT TABLE DATA ............ [OK]"
)
return
# test case1 base
def
test_case1
(
self
):
tdLog
.
debug
(
"-----create database and tables test------- "
)
tdSql
.
execute
(
"drop database if exists db1"
)
tdSql
.
execute
(
"drop database if exists db4"
)
tdSql
.
execute
(
"drop database if exists db6"
)
tdSql
.
execute
(
"drop database if exists db8"
)
tdSql
.
execute
(
"drop database if exists db12"
)
tdSql
.
execute
(
"drop database if exists db16"
)
#create database and tables;
# tdSql.execute("create database db11 vgroups 1")
# # self.create_tables("db1", "stb1", 30*10000)
# tdSql.execute("use db1")
# tdSql.execute("create stable stb1(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)")
# tdSql.execute("create database db12 vgroups 1")
# # self.create_tables("db1", "stb1", 30*10000)
# tdSql.execute("use db1")
# t1 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(1,))
# t2 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(2,))
# t1 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", 0,count/2,))
# t2 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", count/2,count,))
count
=
50000
vgroups
=
1
threads
=
[]
threadNumbers
=
2
for
i
in
range
(
threadNumbers
):
threads
.
append
(
mp
.
Process
(
target
=
self
.
new_create_tables
,
args
=
(
"db1%d"
%
i
,
vgroups
,
"stb1"
,
0
,
count
,)))
start_time
=
time
.
time
()
for
tr
in
threads
:
tr
.
start
()
for
tr
in
threads
:
tr
.
join
()
end_time
=
time
.
time
()
spendTime
=
end_time
-
start_time
speedCreate
=
count
/
spendTime
tdLog
.
debug
(
"spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"
%
(
spendTime
,
count
,
speedCreate
))
# self.new_create_tables("db1", "stb1", 15*10000)
# self.new_create_tables("db1", "stb1", 15*10000)
# tdSql.execute("create database db4 vgroups 4")
# self.create_tables("db4", "stb4", 30*10000)
# tdSql.execute("create database db6 vgroups 6")
# self.create_tables("db6", "stb6", 30*10000)
# tdSql.execute("create database db8 vgroups 8")
# self.create_tables("db8", "stb8", 30*10000)
# tdSql.execute("create database db12 vgroups 12")
# self.create_tables("db12", "stb12", 30*10000)
# tdSql.execute("create database db16 vgroups 16")
# self.create_tables("db16", "stb16", 30*10000)
return
# test case2 base:insert data
def
test_case2
(
self
):
tdLog
.
debug
(
"-----insert data test------- "
)
# drop database
tdSql
.
execute
(
"drop database if exists db1"
)
tdSql
.
execute
(
"drop database if exists db4"
)
tdSql
.
execute
(
"drop database if exists db6"
)
tdSql
.
execute
(
"drop database if exists db8"
)
tdSql
.
execute
(
"drop database if exists db12"
)
tdSql
.
execute
(
"drop database if exists db16"
)
#create database and tables;
tdSql
.
execute
(
"create database db1 vgroups 1"
)
self
.
create_tables
(
"db1"
,
"stb1"
,
1
*
100
)
self
.
insert_data
(
"db1"
,
"stb1"
,
self
.
ts
,
1
*
50
,
1
*
10000
)
tdSql
.
execute
(
"create database db4 vgroups 4"
)
self
.
create_tables
(
"db4"
,
"stb4"
,
1
*
100
)
self
.
insert_data
(
"db4"
,
"stb4"
,
self
.
ts
,
1
*
100
,
1
*
10000
)
tdSql
.
execute
(
"create database db6 vgroups 6"
)
self
.
create_tables
(
"db6"
,
"stb6"
,
1
*
100
)
self
.
insert_data
(
"db6"
,
"stb6"
,
self
.
ts
,
1
*
100
,
1
*
10000
)
tdSql
.
execute
(
"create database db8 vgroups 8"
)
self
.
create_tables
(
"db8"
,
"stb8"
,
1
*
100
)
self
.
insert_data
(
"db8"
,
"stb8"
,
self
.
ts
,
1
*
100
,
1
*
10000
)
tdSql
.
execute
(
"create database db12 vgroups 12"
)
self
.
create_tables
(
"db12"
,
"stb12"
,
1
*
100
)
self
.
insert_data
(
"db12"
,
"stb12"
,
self
.
ts
,
1
*
100
,
1
*
10000
)
tdSql
.
execute
(
"create database db16 vgroups 16"
)
self
.
create_tables
(
"db16"
,
"stb16"
,
1
*
100
)
self
.
insert_data
(
"db16"
,
"stb16"
,
self
.
ts
,
1
*
100
,
1
*
10000
)
return
#
# add case with filename
#
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
\ No newline at end of file
tests/system-test/2-query/smaTest.py
0 → 100644
浏览文件 @
a5e9b14d
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
from
numpy.lib.function_base
import
insert
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
import
numpy
as
np
# constant define
WAITS
=
5
# wait seconds
class
TDTestCase
:
#
# --------------- main frame -------------------
#
# updatecfgDict = {'debugFlag': 135}
# updatecfgDict = {'fqdn': 135}
def
caseDescription
(
self
):
'''
limit and offset keyword function test cases;
case1: limit offset base function test
case2: offset return valid
'''
return
# init
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
tdSql
.
prepare
()
self
.
create_tables
();
self
.
ts
=
1500000000000
# run case
def
run
(
self
):
# insert data
self
.
insert_data1
(
"t1"
,
self
.
ts
,
1000
*
10000
)
self
.
insert_data1
(
"t4"
,
self
.
ts
,
1000
*
10000
)
# test base case
# self.test_case1()
tdLog
.
debug
(
" LIMIT test_case1 ............ [OK]"
)
# test advance case
# self.test_case2()
tdLog
.
debug
(
" LIMIT test_case2 ............ [OK]"
)
# stop
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
#
# --------------- case -------------------
#
# create table
def
create_tables
(
self
):
# super table
tdSql
.
execute
(
"create table st(ts timestamp, i1 int,i2 int) tags(area int)"
);
# child table
tdSql
.
execute
(
"create table t1 using st tags(1)"
);
tdSql
.
execute
(
"create table st1(ts timestamp, i1 int ,i2 int) tags(area int) sma(i2) "
);
tdSql
.
execute
(
"create table t4 using st1 tags(1)"
);
return
# insert data1
def
insert_data
(
self
,
tbname
,
ts_start
,
count
):
pre_insert
=
"insert into %s values"
%
tbname
sql
=
pre_insert
tdLog
.
debug
(
"doing insert table %s rows=%d ..."
%
(
tbname
,
count
))
for
i
in
range
(
count
):
sql
+=
" (%d,%d)"
%
(
ts_start
+
i
*
1000
,
i
)
if
i
>
0
and
i
%
30000
==
0
:
tdSql
.
execute
(
sql
)
sql
=
pre_insert
# end sql
if
sql
!=
pre_insert
:
tdSql
.
execute
(
sql
)
tdLog
.
debug
(
"INSERT TABLE DATA ............ [OK]"
)
return
def
insert_data1
(
self
,
tbname
,
ts_start
,
count
):
pre_insert
=
"insert into %s values"
%
tbname
sql
=
pre_insert
tdLog
.
debug
(
"doing insert table %s rows=%d ..."
%
(
tbname
,
count
))
for
i
in
range
(
count
):
sql
+=
" (%d,%d,%d)"
%
(
ts_start
+
i
*
1000
,
i
,
i
+
1
)
if
i
>
0
and
i
%
30000
==
0
:
tdSql
.
execute
(
sql
)
sql
=
pre_insert
# end sql
if
sql
!=
pre_insert
:
tdSql
.
execute
(
sql
)
tdLog
.
debug
(
"INSERT TABLE DATA ............ [OK]"
)
return
# test case1 base
# def test_case1(self):
# #
# # limit base function
# #
# # base no where
# sql = "select * from t1 limit 10"
# tdSql.waitedQuery(sql, 10, WAITS)
#
# add case with filename
#
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
\ No newline at end of file
tools/shell/src/shellEngine.c
浏览文件 @
a5e9b14d
...
@@ -141,6 +141,9 @@ int32_t shellRunCommand(char *command) {
...
@@ -141,6 +141,9 @@ int32_t shellRunCommand(char *command) {
*
p
++
=
'\\'
;
*
p
++
=
'\\'
;
}
}
break
;
break
;
default:
*
p
++
=
'\\'
;
break
;
}
}
*
p
++
=
c
;
*
p
++
=
c
;
esc
=
false
;
esc
=
false
;
...
...
tools/shell/src/shellNettest.c
浏览文件 @
a5e9b14d
...
@@ -21,7 +21,7 @@ static void shellWorkAsClient() {
...
@@ -21,7 +21,7 @@ static void shellWorkAsClient() {
SRpcInit
rpcInit
=
{
0
};
SRpcInit
rpcInit
=
{
0
};
SEpSet
epSet
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
SEpSet
epSet
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
SRpcMsg
rpcRsp
=
{
0
};
SRpcMsg
rpcRsp
=
{
0
};
void
*
clientRpc
=
NULL
;
void
*
clientRpc
=
NULL
;
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
char
pass
[
TSDB_PASSWORD_LEN
+
1
]
=
{
0
};
taosEncryptPass_c
((
uint8_t
*
)(
"_pwd"
),
strlen
(
"_pwd"
),
pass
);
taosEncryptPass_c
((
uint8_t
*
)(
"_pwd"
),
strlen
(
"_pwd"
),
pass
);
...
@@ -116,6 +116,7 @@ static void shellWorkAsServer() {
...
@@ -116,6 +116,7 @@ static void shellWorkAsServer() {
}
}
SRpcInit
rpcInit
=
{
0
};
SRpcInit
rpcInit
=
{
0
};
memcpy
(
rpcInit
.
localFqdn
,
tsLocalFqdn
,
strlen
(
tsLocalFqdn
));
rpcInit
.
localPort
=
pArgs
->
port
;
rpcInit
.
localPort
=
pArgs
->
port
;
rpcInit
.
label
=
"CHK"
;
rpcInit
.
label
=
"CHK"
;
rpcInit
.
numOfThreads
=
tsNumOfRpcThreads
;
rpcInit
.
numOfThreads
=
tsNumOfRpcThreads
;
...
@@ -126,7 +127,7 @@ static void shellWorkAsServer() {
...
@@ -126,7 +127,7 @@ static void shellWorkAsServer() {
void
*
serverRpc
=
rpcOpen
(
&
rpcInit
);
void
*
serverRpc
=
rpcOpen
(
&
rpcInit
);
if
(
serverRpc
==
NULL
)
{
if
(
serverRpc
==
NULL
)
{
printf
(
"failed to init net test server since %s"
,
terrstr
());
printf
(
"failed to init net test server since %s
\n
"
,
terrstr
());
}
else
{
}
else
{
printf
(
"network test server is initialized, port:%u
\n
"
,
pArgs
->
port
);
printf
(
"network test server is initialized, port:%u
\n
"
,
pArgs
->
port
);
taosSetSignal
(
SIGTERM
,
shellNettestHandler
);
taosSetSignal
(
SIGTERM
,
shellNettestHandler
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录