提交 1c5fda4b 编写于 作者: S Shengliang Guan

TD-10431 drop dnode test

上级 b67074c0
......@@ -448,7 +448,7 @@ static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { a
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
dDebug("config msg is received");
dError("config msg is received, but not supported yet");
SCfgDnodeMsg *pCfg = pMsg->pCont;
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
......@@ -585,4 +585,5 @@ void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
default:
dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
}
rpcFreeCont(pMsg->pCont);
}
......@@ -559,9 +559,12 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
break;
}
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
taosFreeQitem(pMsg);
}
......@@ -645,9 +648,12 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
}
rpcFreeCont(pRpcMsg->pCont);
pRpcMsg->pCont = NULL;
taosFreeQitem(pMsg);
}
}
......@@ -656,9 +662,12 @@ void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
dndReleaseMnode(pDnode, pMnode);
......@@ -668,9 +677,12 @@ void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
dndReleaseMnode(pDnode, pMnode);
......@@ -680,9 +692,12 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
dndReleaseMnode(pDnode, pMnode);
......
......@@ -141,8 +141,8 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF);
} else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont);
}
rpcFreeCont(pMsg->pCont);
}
static int32_t dndInitClient(SDnode *pDnode) {
......
......@@ -202,6 +202,22 @@ TEST_F(DndTestDnode, ShowDnode) {
CheckBinary("", 24);
}
TEST_F(DndTestDnode, ConfigDnode_01) {
SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(sizeof(SCfgDnodeMsg));
pReq->dnodeId = htonl(1);
strcpy(pReq->config, "ddebugflag 131");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCfgDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CONFIG_DNODE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
TEST_F(DndTestDnode, CreateDnode_01) {
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
strcpy(pReq->ep, "localhost:9522");
......@@ -235,77 +251,6 @@ TEST_F(DndTestDnode, CreateDnode_01) {
CheckBinary("", 24);
}
#if 0
TEST_F(DndTestDnode, AlterUser_01) {
ASSERT_NE(pClient, nullptr);
//--- drop user ---
SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg));
strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p2");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
//--- meta ---
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = TSDB_MGMT_TABLE_USER;
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
STableMetaMsg* pMeta = &pShowRsp->tableMeta;
pMeta->numOfColumns = htons(pMeta->numOfColumns);
EXPECT_EQ(pMeta->numOfColumns, 4);
//--- retrieve ---
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = pShowRsp->showId;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
EXPECT_EQ(pRetrieveRsp->numOfRows, 3);
char* pData = pRetrieveRsp->data;
int32_t pos = 0;
char* strVal = NULL;
//--- name ---
{
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "u1");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "root");
pos += sizeof(VarDataLenT);
strVal = (char*)(pData + pos);
pos += TSDB_USER_LEN;
EXPECT_STREQ(strVal, "_root");
}
}
#endif
TEST_F(DndTestDnode, DropDnode_01) {
SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(sizeof(SDropDnodeMsg));
pReq->dnodeId = htonl(2);
......@@ -409,4 +354,4 @@ TEST_F(DndTestDnode, CreateDnode_02) {
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
}
\ No newline at end of file
}
......@@ -336,8 +336,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
}
SRpcConnInfo connInfo = {0};
if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
mndCleanupMsg(pMsg);
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
taosFreeQitem(pMsg);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
return NULL;
......@@ -354,6 +354,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
void mndCleanupMsg(SMnodeMsg *pMsg) {
mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.pCont = NULL;
taosFreeQitem(pMsg);
}
......@@ -367,7 +369,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
int32_t code = 0;
int32_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType % 2 == 1);
bool isReq = (msgType & 1U);
mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, taosMsg[msgType]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册