未验证 提交 51d7eddf 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9347 from taosdata/feature/dnode3

Feature/dnode3
...@@ -86,5 +86,6 @@ SpacesInSquareBrackets: false ...@@ -86,5 +86,6 @@ SpacesInSquareBrackets: false
Standard: Auto Standard: Auto
TabWidth: 8 TabWidth: 8
UseTab: Never UseTab: Never
AlignConsecutiveDeclarations: true
... ...
...@@ -922,18 +922,15 @@ typedef struct SShowRsp { ...@@ -922,18 +922,15 @@ typedef struct SShowRsp {
typedef struct { typedef struct {
char ep[TSDB_EP_LEN]; // end point, hostname:port char ep[TSDB_EP_LEN]; // end point, hostname:port
int32_t reserve[8];
} SCreateDnodeMsg; } SCreateDnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t reserve[8];
} SDropDnodeMsg; } SDropDnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
char config[TSDB_DNODE_CONFIG_LEN]; char config[TSDB_DNODE_CONFIG_LEN];
int32_t reserve[8];
} SCfgDnodeMsg; } SCfgDnodeMsg;
typedef struct { typedef struct {
...@@ -942,7 +939,6 @@ typedef struct { ...@@ -942,7 +939,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int8_t align[3];
int8_t replica; int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
} SCreateMnodeInMsg, SAlterMnodeInMsg; } SCreateMnodeInMsg, SAlterMnodeInMsg;
......
...@@ -349,7 +349,7 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -349,7 +349,7 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
SReplica *pReplica = &pOption->replicas[0]; SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1; pReplica->id = 1;
pReplica->port = pDnode->opt.serverPort; pReplica->port = pDnode->opt.serverPort;
tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
...@@ -376,7 +376,7 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC ...@@ -376,7 +376,7 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC
SReplica *pReplica = &pOption->replicas[i]; SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pMsg->replicas[i].id; pReplica->id = pMsg->replicas[i].id;
pReplica->port = pMsg->replicas[i].port; pReplica->port = pMsg->replicas[i].port;
tstrncpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) { if (pReplica->id == pOption->dnodeId) {
pOption->selfIndex = i; pOption->selfIndex = i;
} }
...@@ -479,9 +479,11 @@ static int32_t dndDropMnode(SDnode *pDnode) { ...@@ -479,9 +479,11 @@ static int32_t dndDropMnode(SDnode *pDnode) {
return -1; return -1;
} }
dndReleaseMnode(pDnode, pMnode);
dndStopMnodeWorker(pDnode); dndStopMnodeWorker(pDnode);
dndWriteMnodeFile(pDnode); dndWriteMnodeFile(pDnode);
mndClose(pMnode); mndClose(pMnode);
pMgmt->pMnode = NULL;
mndDestroy(pDnode->dir.mnode); mndDestroy(pDnode->dir.mnode);
return 0; return 0;
...@@ -499,7 +501,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { ...@@ -499,7 +501,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
} }
static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID; terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
...@@ -515,18 +517,23 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -515,18 +517,23 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
} }
static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID; terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1; return -1;
} else {
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1;
}
return dndAlterMnode(pDnode, &option);
} }
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1;
}
if (dndAlterMnode(pDnode, &option) != 0) {
return -1;
}
return dndWriteMnodeFile(pDnode);
} }
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
...@@ -555,16 +562,17 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -555,16 +562,17 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
code = dndProcessDropMnodeReq(pDnode, pMsg); code = dndProcessDropMnodeReq(pDnode, pMsg);
break; break;
default: default:
code = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
break; break;
} }
if (pMsg->msgType & 1u) { if (pMsg->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
...@@ -625,8 +633,6 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -625,8 +633,6 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
} }
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) { static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
assert(pQueue);
SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -647,15 +653,18 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { ...@@ -647,15 +653,18 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg != NULL) *pMsg = *pRpcMsg;
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
if (pRpcMsg->msgType & 1u) { if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
pRpcMsg->pCont = NULL;
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
dndReleaseMnode(pDnode, pMnode);
} }
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
...@@ -894,6 +903,11 @@ int32_t dndInitMnode(SDnode *pDnode) { ...@@ -894,6 +903,11 @@ int32_t dndInitMnode(SDnode *pDnode) {
return -1; return -1;
} }
if (dndAllocMnodeMgmtQueue(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
char path[PATH_MAX]; char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode);
pMgmt->file = strdup(path); pMgmt->file = strdup(path);
...@@ -935,8 +949,9 @@ void dndCleanupMnode(SDnode *pDnode) { ...@@ -935,8 +949,9 @@ void dndCleanupMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
dInfo("dnode-mnode start to clean up"); dInfo("dnode-mnode start to clean up");
dndStopMnodeWorker(pDnode); if (pMgmt->pMnode) dndStopMnodeWorker(pDnode);
dndCleanupMnodeMgmtWorker(pDnode); dndCleanupMnodeMgmtWorker(pDnode);
dndFreeMnodeMgmtQueue(pDnode);
tfree(pMgmt->file); tfree(pMgmt->file);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
dInfo("dnode-mnode is cleaned up"); dInfo("dnode-mnode is cleaned up");
......
...@@ -140,7 +140,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -140,7 +140,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
DndMsgFp fp = pMgmt->msgFp[msgType]; DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) { if (fp != NULL) {
(*fp)(pDnode, pMsg, pEpSet); (*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); dTrace("RPC %p, rsp:%s is processed, code:0x%x", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF);
} else { } else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -188,7 +188,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -188,7 +188,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dTrace("RPC %p, network test req, app:%p will be processed", pMsg->handle, pMsg->ahandle); dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code);
dndProcessDnodeReq(pDnode, pMsg, pEpSet); dndProcessDnodeReq(pDnode, pMsg, pEpSet);
return; return;
} }
......
...@@ -7,7 +7,7 @@ add_subdirectory(cluster) ...@@ -7,7 +7,7 @@ add_subdirectory(cluster)
add_subdirectory(db) add_subdirectory(db)
add_subdirectory(dnode) add_subdirectory(dnode)
# add_subdirectory(func) # add_subdirectory(func)
# add_subdirectory(mnode) add_subdirectory(mnode)
add_subdirectory(profile) add_subdirectory(profile)
add_subdirectory(show) add_subdirectory(show)
add_subdirectory(stb) add_subdirectory(stb)
......
aux_source_directory(. MTEST_SRC)
add_executable(dnode_test_mnode ${MTEST_SRC})
target_link_libraries(
dnode_test_mnode
PUBLIC sut
)
add_test(
NAME dnode_test_mnode
COMMAND dnode_test_mnode
)
/**
* @file dnode.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module dnode-msg tests
* @version 0.1
* @date 2021-12-15
*
* @copyright Copyright (c) 2021
*
*/
#include "base.h"
class DndTestMnode : public ::testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
public:
static void SetUpTestSuite() {
test.Init("/tmp/dnode_test_mnode1", 9061);
const char* fqdn = "localhost";
const char* firstEp = "localhost:9061";
server2.Start("/tmp/dnode_test_mnode2", fqdn, 9062, firstEp);
server3.Start("/tmp/dnode_test_mnode3", fqdn, 9063, firstEp);
server4.Start("/tmp/dnode_test_mnode4", fqdn, 9064, firstEp);
server5.Start("/tmp/dnode_test_mnode5", fqdn, 9065, firstEp);
taosMsleep(300);
}
static void TearDownTestSuite() {
server2.Stop();
server3.Stop();
server4.Stop();
server5.Stop();
test.Cleanup();
}
static Testbase test;
static TestServer server2;
static TestServer server3;
static TestServer server4;
static TestServer server5;
};
Testbase DndTestMnode::test;
TestServer DndTestMnode::server2;
TestServer DndTestMnode::server3;
TestServer DndTestMnode::server4;
TestServer DndTestMnode::server5;
TEST_F(DndTestMnode, 01_ShowDnode) {
test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, "");
CHECK_META("show mnodes", 5);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, 12 + VARSTR_HEADER_SIZE, "role");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_TIMESTAMP, 8, "role_time");
CHECK_SCHEMA(4, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("master", 12);
CheckInt64(0);
CheckTimestamp();
}
TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) {
{
int32_t contLen = sizeof(SCreateMnodeMsg);
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MNODE_ALREADY_EXIST);
}
}
TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) {
{
int32_t contLen = sizeof(SCreateMnodeMsg);
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
}
}
TEST_F(DndTestMnode, 04_Create_Mnode) {
{
// create dnode
int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9062");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
taosMsleep(1300);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
}
{
// create mnode
int32_t contLen = sizeof(SCreateMnodeMsg);
SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, "");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
CheckInt16(1);
CheckInt16(2);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("localhost:9062", TSDB_EP_LEN);
CheckBinary("master", 12);
CheckBinary("slave", 12);
CheckInt64(0);
CheckInt64(0);
CheckTimestamp();
CheckTimestamp();
}
{
// drop mnode
int32_t contLen = sizeof(SDropMnodeMsg);
SDropMnodeMsg* pReq = (SDropMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_MNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, "");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("master", 12);
CheckInt64(0);
CheckTimestamp();
}
}
// {
// int32_t contLen = sizeof(SDropDnodeMsg);
// SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(2);
// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DNODE, pReq, contLen);
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, 0);
// }
// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
// CHECK_META("show dnodes", 7);
// test.SendShowRetrieveMsg();
// EXPECT_EQ(test.GetShowRows(), 1);
// CheckInt16(1);
// CheckBinary("localhost:9061", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckBinary("", 24);
// {
// int32_t contLen = sizeof(SCreateDnodeMsg);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9063");
// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, 0);
// }
// {
// int32_t contLen = sizeof(SCreateDnodeMsg);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9064");
// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, 0);
// }
// {
// int32_t contLen = sizeof(SCreateDnodeMsg);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9065");
// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, 0);
// }
// taosMsleep(1300);
// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
// CHECK_META("show dnodes", 7);
// test.SendShowRetrieveMsg();
// EXPECT_EQ(test.GetShowRows(), 4);
// CheckInt16(1);
// CheckInt16(3);
// CheckInt16(4);
// CheckInt16(5);
// CheckBinary("localhost:9061", TSDB_EP_LEN);
// CheckBinary("localhost:9063", TSDB_EP_LEN);
// CheckBinary("localhost:9064", TSDB_EP_LEN);
// CheckBinary("localhost:9065", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// // restart
// uInfo("stop all server");
// test.Restart();
// server2.Restart();
// server3.Restart();
// server4.Restart();
// server5.Restart();
// taosMsleep(1300);
// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
// CHECK_META("show dnodes", 7);
// test.SendShowRetrieveMsg();
// EXPECT_EQ(test.GetShowRows(), 4);
// CheckInt16(1);
// CheckInt16(3);
// CheckInt16(4);
// CheckInt16(5);
// CheckBinary("localhost:9061", TSDB_EP_LEN);
// CheckBinary("localhost:9063", TSDB_EP_LEN);
// CheckBinary("localhost:9064", TSDB_EP_LEN);
// CheckBinary("localhost:9065", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// }
...@@ -290,11 +290,6 @@ typedef struct SMnodeMsg { ...@@ -290,11 +290,6 @@ typedef struct SMnodeMsg {
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int32_t acctId; int32_t acctId;
SMnode *pMnode; SMnode *pMnode;
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t code;
int64_t createdTime; int64_t createdTime;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int32_t contLen; int32_t contLen;
......
...@@ -27,6 +27,7 @@ void mndCleanupMnode(SMnode *pMnode); ...@@ -27,6 +27,7 @@ void mndCleanupMnode(SMnode *pMnode);
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
char *mndGetRoleStr(int32_t role); char *mndGetRoleStr(int32_t role);
void mndUpdateMnodeRole(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -828,9 +828,9 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { ...@@ -828,9 +828,9 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SSyncDbMsg *pSync = pMsg->rpcMsg.pCont; SSyncDbMsg *pSync = pMsg->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); SDbObj *pDb = mndAcquireDb(pMnode, pSync->db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to process sync db msg since %s", pMsg->db, terrstr()); mError("db:%s, failed to process sync db msg since %s", pSync->db, terrstr());
return -1; return -1;
} }
...@@ -841,9 +841,9 @@ static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { ...@@ -841,9 +841,9 @@ static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCompactDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont; SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to process compact db msg since %s", pMsg->db, terrstr()); mError("db:%s, failed to process compact db msg since %s", pCompact->db, terrstr());
return -1; return -1;
} }
......
...@@ -33,4 +33,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -33,4 +33,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
return code; return code;
} }
bool mndIsMaster(SMnode *pMnode) { return true; } bool mndIsMaster(SMnode *pMnode) {
\ No newline at end of file // pMnode->role = TAOS_SYNC_STATE_LEADER;
return true;
}
\ No newline at end of file
...@@ -622,10 +622,10 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { ...@@ -622,10 +622,10 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction != NULL) { if (pAction != NULL) {
pAction->msgReceived = 1; pAction->msgReceived = 1;
pAction->errCode = pMsg->code; pAction->errCode = pMsg->rpcMsg.code;
} }
mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code); mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code);
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
HANDLE_ACTION_RSP_OVER: HANDLE_ACTION_RSP_OVER:
...@@ -696,7 +696,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -696,7 +696,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgSent) continue; if (pAction->msgReceived && pAction->errCode == 0) continue;
int64_t signature = pTrans->id; int64_t signature = pTrans->id;
signature = (signature << 32); signature = (signature << 32);
...@@ -736,6 +736,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -736,6 +736,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
terrno = errorCode; terrno = errorCode;
return errorCode; return errorCode;
} else { } else {
mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceivedMsgs, numOfActions, errorCode);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
} }
......
...@@ -178,8 +178,10 @@ static int32_t mndExecSteps(SMnode *pMnode) { ...@@ -178,8 +178,10 @@ static int32_t mndExecSteps(SMnode *pMnode) {
// (*pMnode->reportProgress)(pStep->name, "start initialize"); // (*pMnode->reportProgress)(pStep->name, "start initialize");
if ((*pStep->initFp)(pMnode) != 0) { if ((*pStep->initFp)(pMnode) != 0) {
int32_t code = terrno;
mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr()); mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr());
mndCleanupSteps(pMnode, pos); mndCleanupSteps(pMnode, pos);
terrno = code;
return -1; return -1;
} else { } else {
mDebug("step:%s is initialized", pStep->name); mDebug("step:%s is initialized", pStep->name);
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdbInt.h" #include "sdbInt.h"
static int32_t sdbCreateDir(SSdb *pSdb);
SSdb *sdbInit(SSdbOpt *pOption) { SSdb *sdbInit(SSdbOpt *pOption) {
mDebug("start to init sdb in %s", pOption->path); mDebug("start to init sdb in %s", pOption->path);
...@@ -40,6 +42,11 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -40,6 +42,11 @@ SSdb *sdbInit(SSdbOpt *pOption) {
return NULL; return NULL;
} }
if (sdbCreateDir(pSdb) != 0) {
sdbCleanup(pSdb);
return NULL;
}
for (ESdbType i = 0; i < SDB_MAX; ++i) { for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]); taosInitRWLatch(&pSdb->locks[i]);
} }
...@@ -53,8 +60,8 @@ void sdbCleanup(SSdb *pSdb) { ...@@ -53,8 +60,8 @@ void sdbCleanup(SSdb *pSdb) {
mDebug("start to cleanup sdb"); mDebug("start to cleanup sdb");
// if (pSdb->curVer != pSdb->lastCommitVer) { // if (pSdb->curVer != pSdb->lastCommitVer) {
mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
sdbWriteFile(pSdb); sdbWriteFile(pSdb);
// } // }
if (pSdb->currDir != NULL) { if (pSdb->currDir != NULL) {
...@@ -133,4 +140,26 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { ...@@ -133,4 +140,26 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
mDebug("sdb table:%d is initialized", sdbType); mDebug("sdb table:%d is initialized", sdbType);
return 0; return 0;
} }
\ No newline at end of file
static int32_t sdbCreateDir(SSdb *pSdb) {
if (taosMkDir(pSdb->currDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->currDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->syncDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->tmpDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
return -1;
}
return 0;
}
...@@ -17,28 +17,6 @@ ...@@ -17,28 +17,6 @@
#include "sdbInt.h" #include "sdbInt.h"
#include "tchecksum.h" #include "tchecksum.h"
static int32_t sdbCreateDir(SSdb *pSdb) {
if (taosMkDir(pSdb->currDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->currDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->syncDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->tmpDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
return -1;
}
return 0;
}
static int32_t sdbRunDeployFp(SSdb *pSdb) { static int32_t sdbRunDeployFp(SSdb *pSdb) {
mDebug("start to deploy sdb"); mDebug("start to deploy sdb");
...@@ -77,7 +55,7 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -77,7 +55,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
free(pRaw); free(pRaw);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, terrstr()); mError("failed to read file:%s since %s", file, terrstr());
return -1; return 0;
} }
while (1) { while (1) {
...@@ -225,10 +203,6 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -225,10 +203,6 @@ int32_t sdbWriteFile(SSdb *pSdb) {
} }
int32_t sdbDeploy(SSdb *pSdb) { int32_t sdbDeploy(SSdb *pSdb) {
if (sdbCreateDir(pSdb) != 0) {
return -1;
}
if (sdbRunDeployFp(pSdb) != 0) { if (sdbRunDeployFp(pSdb) != 0) {
return -1; return -1;
} }
......
...@@ -38,6 +38,7 @@ int32_t tWorkerInit(SWorkerPool *pool) { ...@@ -38,6 +38,7 @@ int32_t tWorkerInit(SWorkerPool *pool) {
void tWorkerCleanup(SWorkerPool *pool) { void tWorkerCleanup(SWorkerPool *pool) {
for (int i = 0; i < pool->max; ++i) { for (int i = 0; i < pool->max; ++i) {
SWorker *worker = pool->workers + i; SWorker *worker = pool->workers + i;
if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
taosQsetThreadResume(pool->qset); taosQsetThreadResume(pool->qset);
} }
...@@ -45,6 +46,7 @@ void tWorkerCleanup(SWorkerPool *pool) { ...@@ -45,6 +46,7 @@ void tWorkerCleanup(SWorkerPool *pool) {
for (int i = 0; i < pool->max; ++i) { for (int i = 0; i < pool->max; ++i) {
SWorker *worker = pool->workers + i; SWorker *worker = pool->workers + i;
if (worker == NULL) continue;
if (taosCheckPthreadValid(worker->thread)) { if (taosCheckPthreadValid(worker->thread)) {
pthread_join(worker->thread, NULL); pthread_join(worker->thread, NULL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册