提交 34619844 编写于 作者: S Shengliang Guan

fix bug while exec trans

上级 bb1d721e
...@@ -86,5 +86,6 @@ SpacesInSquareBrackets: false ...@@ -86,5 +86,6 @@ SpacesInSquareBrackets: false
Standard: Auto Standard: Auto
TabWidth: 8 TabWidth: 8
UseTab: Never UseTab: Never
AlignConsecutiveDeclarations: true
... ...
...@@ -909,36 +909,29 @@ typedef struct SShowRsp { ...@@ -909,36 +909,29 @@ typedef struct SShowRsp {
typedef struct { typedef struct {
char ep[TSDB_EP_LEN]; // end point, hostname:port char ep[TSDB_EP_LEN]; // end point, hostname:port
int32_t reserve[8];
} SCreateDnodeMsg; } SCreateDnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t reserve[8];
} SDropDnodeMsg; } SDropDnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
char config[TSDB_DNODE_CONFIG_LEN]; char config[TSDB_DNODE_CONFIG_LEN];
int32_t reserve[8];
} SCfgDnodeMsg; } SCfgDnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t reserve[8];
} SCreateMnodeMsg, SDropMnodeMsg; } SCreateMnodeMsg, SDropMnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int8_t align[3];
int8_t replica; int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
int32_t reserve[8];
} SCreateMnodeInMsg, SAlterMnodeInMsg; } SCreateMnodeInMsg, SAlterMnodeInMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t reserve[8];
} SDropMnodeInMsg; } SDropMnodeInMsg;
typedef struct { typedef struct {
......
...@@ -349,7 +349,7 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -349,7 +349,7 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
SReplica *pReplica = &pOption->replicas[0]; SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1; pReplica->id = 1;
pReplica->port = pDnode->opt.serverPort; pReplica->port = pDnode->opt.serverPort;
tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
...@@ -376,7 +376,7 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC ...@@ -376,7 +376,7 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC
SReplica *pReplica = &pOption->replicas[i]; SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pMsg->replicas[i].id; pReplica->id = pMsg->replicas[i].id;
pReplica->port = pMsg->replicas[i].port; pReplica->port = pMsg->replicas[i].port;
tstrncpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) { if (pReplica->id == pOption->dnodeId) {
pOption->selfIndex = i; pOption->selfIndex = i;
} }
...@@ -499,7 +499,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { ...@@ -499,7 +499,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
} }
static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID; terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
...@@ -515,18 +515,23 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -515,18 +515,23 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
} }
static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID; terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1; return -1;
} else {
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1;
}
return dndAlterMnode(pDnode, &option);
} }
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1;
}
if (dndAlterMnode(pDnode, &option) != 0) {
return -1;
}
return dndWriteMnodeFile(pDnode);
} }
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
...@@ -555,16 +560,17 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -555,16 +560,17 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
code = dndProcessDropMnodeReq(pDnode, pMsg); code = dndProcessDropMnodeReq(pDnode, pMsg);
break; break;
default: default:
code = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
break; break;
} }
if (pMsg->msgType & 1u) { if (pMsg->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
...@@ -625,8 +631,6 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -625,8 +631,6 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
} }
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) { static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
assert(pQueue);
SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -647,13 +651,14 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { ...@@ -647,13 +651,14 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg != NULL) *pMsg = *pRpcMsg;
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
if (pRpcMsg->msgType & 1u) { if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
pRpcMsg->pCont = NULL;
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
} }
...@@ -894,6 +899,11 @@ int32_t dndInitMnode(SDnode *pDnode) { ...@@ -894,6 +899,11 @@ int32_t dndInitMnode(SDnode *pDnode) {
return -1; return -1;
} }
if (dndAllocMnodeMgmtQueue(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
char path[PATH_MAX]; char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode);
pMgmt->file = strdup(path); pMgmt->file = strdup(path);
...@@ -937,6 +947,7 @@ void dndCleanupMnode(SDnode *pDnode) { ...@@ -937,6 +947,7 @@ void dndCleanupMnode(SDnode *pDnode) {
dInfo("dnode-mnode start to clean up"); dInfo("dnode-mnode start to clean up");
dndStopMnodeWorker(pDnode); dndStopMnodeWorker(pDnode);
dndCleanupMnodeMgmtWorker(pDnode); dndCleanupMnodeMgmtWorker(pDnode);
dndFreeMnodeMgmtQueue(pDnode);
tfree(pMgmt->file); tfree(pMgmt->file);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
dInfo("dnode-mnode is cleaned up"); dInfo("dnode-mnode is cleaned up");
......
...@@ -70,187 +70,209 @@ TEST_F(DndTestMnode, 01_ShowDnode) { ...@@ -70,187 +70,209 @@ TEST_F(DndTestMnode, 01_ShowDnode) {
CheckTimestamp(); CheckTimestamp();
} }
#if 0 TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) {
TEST_F(DndTestMnode, 02_ConfigDnode) {
int32_t contLen = sizeof(SCfgDnodeMsg);
SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
strcpy(pReq->config, "ddebugflag 131");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CONFIG_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
TEST_F(DndTestMnode, 03_Create_Drop_Restart_Dnode) {
{ {
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateMnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9062"); pReq->dnodeId = htonl(1);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MNODE_ALREADY_EXIST);
} }
}
taosMsleep(1300); TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) {
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
CheckInt16(1);
CheckInt16(2);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("localhost:9062", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
{ {
int32_t contLen = sizeof(SDropDnodeMsg); int32_t contLen = sizeof(SCreateMnodeMsg);
SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen); SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2); pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DNODE, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
} }
}
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); TEST_F(DndTestMnode, 04_Create_Mnode) {
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(1);
CheckBinary("ready", 10);
CheckTimestamp();
CheckBinary("", 24);
{ {
// create dnode
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9063"); strcpy(pReq->ep, "localhost:9062");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
}
{ taosMsleep(1300);
int32_t contLen = sizeof(SCreateDnodeMsg); test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
test.SendShowRetrieveMsg();
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); EXPECT_EQ(test.GetShowRows(), 2);
strcpy(pReq->ep, "localhost:9064");
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
} }
{ {
int32_t contLen = sizeof(SCreateDnodeMsg); // create mnode
int32_t contLen = sizeof(SCreateMnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9065"); pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
}
taosMsleep(1300); test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, "");
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); test.SendShowRetrieveMsg();
CHECK_META("show dnodes", 7); EXPECT_EQ(test.GetShowRows(), 2);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 4); CheckInt16(1);
CheckInt16(2);
CheckInt16(1); CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckInt16(3); CheckBinary("localhost:9062", TSDB_EP_LEN);
CheckInt16(4); CheckBinary("master", 12);
CheckInt16(5); CheckBinary("slave", 12);
CheckBinary("localhost:9061", TSDB_EP_LEN); CheckInt64(0);
CheckBinary("localhost:9063", TSDB_EP_LEN); CheckInt64(0);
CheckBinary("localhost:9064", TSDB_EP_LEN); CheckTimestamp();
CheckBinary("localhost:9065", TSDB_EP_LEN); CheckTimestamp();
CheckInt16(0); }
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
// restart
uInfo("stop all server");
test.Restart();
server2.Restart();
server3.Restart();
server4.Restart();
server5.Restart();
taosMsleep(1300);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
CheckInt16(5);
CheckBinary("localhost:9061", TSDB_EP_LEN);
CheckBinary("localhost:9063", TSDB_EP_LEN);
CheckBinary("localhost:9064", TSDB_EP_LEN);
CheckBinary("localhost:9065", TSDB_EP_LEN);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(0);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckInt16(1);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckBinary("ready", 10);
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckTimestamp();
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
CheckBinary("", 24);
} }
// {
#endif // int32_t contLen = sizeof(SDropDnodeMsg);
\ No newline at end of file
// SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(2);
// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DNODE, pReq, contLen);
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, 0);
// }
// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
// CHECK_META("show dnodes", 7);
// test.SendShowRetrieveMsg();
// EXPECT_EQ(test.GetShowRows(), 1);
// CheckInt16(1);
// CheckBinary("localhost:9061", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckBinary("", 24);
// {
// int32_t contLen = sizeof(SCreateDnodeMsg);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9063");
// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, 0);
// }
// {
// int32_t contLen = sizeof(SCreateDnodeMsg);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9064");
// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, 0);
// }
// {
// int32_t contLen = sizeof(SCreateDnodeMsg);
// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
// strcpy(pReq->ep, "localhost:9065");
// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
// ASSERT_NE(pMsg, nullptr);
// ASSERT_EQ(pMsg->code, 0);
// }
// taosMsleep(1300);
// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
// CHECK_META("show dnodes", 7);
// test.SendShowRetrieveMsg();
// EXPECT_EQ(test.GetShowRows(), 4);
// CheckInt16(1);
// CheckInt16(3);
// CheckInt16(4);
// CheckInt16(5);
// CheckBinary("localhost:9061", TSDB_EP_LEN);
// CheckBinary("localhost:9063", TSDB_EP_LEN);
// CheckBinary("localhost:9064", TSDB_EP_LEN);
// CheckBinary("localhost:9065", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// // restart
// uInfo("stop all server");
// test.Restart();
// server2.Restart();
// server3.Restart();
// server4.Restart();
// server5.Restart();
// taosMsleep(1300);
// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
// CHECK_META("show dnodes", 7);
// test.SendShowRetrieveMsg();
// EXPECT_EQ(test.GetShowRows(), 4);
// CheckInt16(1);
// CheckInt16(3);
// CheckInt16(4);
// CheckInt16(5);
// CheckBinary("localhost:9061", TSDB_EP_LEN);
// CheckBinary("localhost:9063", TSDB_EP_LEN);
// CheckBinary("localhost:9064", TSDB_EP_LEN);
// CheckBinary("localhost:9065", TSDB_EP_LEN);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(0);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckInt16(1);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckBinary("ready", 10);
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckTimestamp();
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// CheckBinary("", 24);
// }
...@@ -31,6 +31,7 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj ...@@ -31,6 +31,7 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg);
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg);
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg);
static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
...@@ -49,6 +50,7 @@ int32_t mndInitMnode(SMnode *pMnode) { ...@@ -49,6 +50,7 @@ int32_t mndInitMnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeReq); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeReq);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeReq); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeReq);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP, mndProcessCreateMnodeRsp); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP, mndProcessCreateMnodeRsp);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_MNODE_IN_RSP, mndProcessAlterMnodeRsp);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE_IN_RSP, mndProcessDropMnodeRsp); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE_IN_RSP, mndProcessDropMnodeRsp);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndGetMnodeMeta); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndGetMnodeMeta);
...@@ -270,6 +272,8 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -270,6 +272,8 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++; numOfReplicas++;
createMsg.replica = numOfReplicas;
while (1) { while (1) {
SMnodeObj *pMObj = NULL; SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
...@@ -382,7 +386,7 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { ...@@ -382,7 +386,7 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
mError("mnode:%d, dnode not exist", pDnode->id); mError("mnode:%d, dnode not exist", pCreate->dnodeId);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1; return -1;
} }
...@@ -560,9 +564,20 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { ...@@ -560,9 +564,20 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
......
...@@ -622,7 +622,7 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { ...@@ -622,7 +622,7 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction != NULL) { if (pAction != NULL) {
pAction->msgReceived = 1; pAction->msgReceived = 1;
pAction->errCode = pMsg->code; pAction->errCode = pMsg->rpcMsg.code;
} }
mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code); mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code);
......
...@@ -178,8 +178,10 @@ static int32_t mndExecSteps(SMnode *pMnode) { ...@@ -178,8 +178,10 @@ static int32_t mndExecSteps(SMnode *pMnode) {
// (*pMnode->reportProgress)(pStep->name, "start initialize"); // (*pMnode->reportProgress)(pStep->name, "start initialize");
if ((*pStep->initFp)(pMnode) != 0) { if ((*pStep->initFp)(pMnode) != 0) {
int32_t code = terrno;
mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr()); mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr());
mndCleanupSteps(pMnode, pos); mndCleanupSteps(pMnode, pos);
terrno = code;
return -1; return -1;
} else { } else {
mDebug("step:%s is initialized", pStep->name); mDebug("step:%s is initialized", pStep->name);
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "sdbInt.h" #include "sdbInt.h"
static int32_t sdbCreateDir(SSdb *pSdb);
SSdb *sdbInit(SSdbOpt *pOption) { SSdb *sdbInit(SSdbOpt *pOption) {
mDebug("start to init sdb in %s", pOption->path); mDebug("start to init sdb in %s", pOption->path);
...@@ -40,6 +42,11 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -40,6 +42,11 @@ SSdb *sdbInit(SSdbOpt *pOption) {
return NULL; return NULL;
} }
if (sdbCreateDir(pSdb) != 0) {
sdbCleanup(pSdb);
return NULL;
}
for (ESdbType i = 0; i < SDB_MAX; ++i) { for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]); taosInitRWLatch(&pSdb->locks[i]);
} }
...@@ -53,8 +60,8 @@ void sdbCleanup(SSdb *pSdb) { ...@@ -53,8 +60,8 @@ void sdbCleanup(SSdb *pSdb) {
mDebug("start to cleanup sdb"); mDebug("start to cleanup sdb");
// if (pSdb->curVer != pSdb->lastCommitVer) { // if (pSdb->curVer != pSdb->lastCommitVer) {
mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
sdbWriteFile(pSdb); sdbWriteFile(pSdb);
// } // }
if (pSdb->currDir != NULL) { if (pSdb->currDir != NULL) {
...@@ -133,4 +140,26 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { ...@@ -133,4 +140,26 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
mDebug("sdb table:%d is initialized", sdbType); mDebug("sdb table:%d is initialized", sdbType);
return 0; return 0;
} }
\ No newline at end of file
static int32_t sdbCreateDir(SSdb *pSdb) {
if (taosMkDir(pSdb->currDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->currDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->syncDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->tmpDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
return -1;
}
return 0;
}
...@@ -17,28 +17,6 @@ ...@@ -17,28 +17,6 @@
#include "sdbInt.h" #include "sdbInt.h"
#include "tchecksum.h" #include "tchecksum.h"
static int32_t sdbCreateDir(SSdb *pSdb) {
if (taosMkDir(pSdb->currDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->currDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->syncDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->tmpDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
return -1;
}
return 0;
}
static int32_t sdbRunDeployFp(SSdb *pSdb) { static int32_t sdbRunDeployFp(SSdb *pSdb) {
mDebug("start to deploy sdb"); mDebug("start to deploy sdb");
...@@ -77,7 +55,7 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -77,7 +55,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
free(pRaw); free(pRaw);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, terrstr()); mError("failed to read file:%s since %s", file, terrstr());
return -1; return 0;
} }
while (1) { while (1) {
...@@ -225,10 +203,6 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -225,10 +203,6 @@ int32_t sdbWriteFile(SSdb *pSdb) {
} }
int32_t sdbDeploy(SSdb *pSdb) { int32_t sdbDeploy(SSdb *pSdb) {
if (sdbCreateDir(pSdb) != 0) {
return -1;
}
if (sdbRunDeployFp(pSdb) != 0) { if (sdbRunDeployFp(pSdb) != 0) {
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册