From ed2254dbcc13a1e578fba06ce371549358c679c3 Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 10 Apr 2020 14:09:50 +0800 Subject: [PATCH] [TD-114] let create table wait until master selected --- src/inc/mnode.h | 5 +++- src/inc/taosmsg.h | 1 - src/mnode/inc/mgmtShell.h | 1 + src/mnode/inc/mgmtVgroup.h | 1 + src/mnode/src/mgmtDnode.c | 13 +++++---- src/mnode/src/mgmtProfile.c | 2 ++ src/mnode/src/mgmtShell.c | 9 +++++++ src/mnode/src/mgmtTable.c | 47 ++++++++++++++++++++++----------- src/mnode/src/mgmtVgroup.c | 13 +++++++++ src/vnode/main/src/vnodeMain.c | 1 - src/vnode/main/src/vnodeWrite.c | 3 ++- tests/script/tmp/prepare.sim | 3 ++- 12 files changed, 73 insertions(+), 26 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 903b172068..dec9292209 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -149,7 +149,8 @@ typedef struct _vg_obj { int32_t lbDnodeId; int32_t lbTime; int8_t status; - int8_t reserved[14]; + int8_t inUse; + int8_t reserved[13]; int8_t updateEnd[1]; int32_t refCount; struct _vg_obj *prev, *next; @@ -243,6 +244,8 @@ typedef struct { int8_t received; int8_t successed; int8_t expected; + int8_t retry; + int8_t maxRetry; int32_t contLen; int32_t code; void *ahandle; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 5f07d6ec99..f5168a2c9e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -510,7 +510,6 @@ typedef struct SRetrieveTableRsp { typedef struct { int32_t vgId; - int32_t vnode; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index b92e9de1f4..171c93a390 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -31,6 +31,7 @@ void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); void mgmtAddToShellQueue(SQueuedMsg *queuedMsg); +void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg); void mgmtSendSimpleResp(void *thandle, int32_t code); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 83e003e063..072c616f3d 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -37,6 +37,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb); void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup); +void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload); void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index e4ab114090..2f4abc5cfe 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -189,18 +189,21 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { int32_t openVnodes = htons(pStatus->openVnodes); for (int32_t j = 0; j < openVnodes; ++j) { - pDnode->vload[j].vgId = htonl(pStatus->load[j].vgId); - pDnode->vload[j].totalStorage = htobe64(pStatus->load[j].totalStorage); - pDnode->vload[j].compStorage = htobe64(pStatus->load[j].compStorage); - pDnode->vload[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten); + SVnodeLoad *pVload = &pStatus->load[j]; + pDnode->vload[j].vgId = htonl(pVload->vgId); + pDnode->vload[j].totalStorage = htobe64(pVload->totalStorage); + pDnode->vload[j].compStorage = htobe64(pVload->compStorage); + pDnode->vload[j].pointsWritten = htobe64(pVload->pointsWritten); SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId); if (pVgroup == NULL) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId); mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL); + } else { + mgmtUpdateVgroupStatus(pVgroup, pDnode->dnodeId, pVload); + mgmtReleaseVgroup(pVgroup); } - mgmtReleaseVgroup(pVgroup); } if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 0360432971..2b22fae47a 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -806,6 +806,8 @@ void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) { pDestMsg->msgType = pSrcMsg->msgType; pDestMsg->pCont = pSrcMsg->pCont; pDestMsg->contLen = pSrcMsg->contLen; + pDestMsg->retry = pSrcMsg->retry; + pDestMsg->maxRetry= pSrcMsg->maxRetry; pDestMsg->pUser = pSrcMsg->pUser; pDestMsg->usePublicIp = pSrcMsg->usePublicIp; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 7b6a2654ae..880c6d0c10 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -128,6 +128,15 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { taosScheduleTask(tsMgmtTranQhandle, &schedMsg); } +static void mgmtDoDealyedAddToShellQueue(void *param, void *tmrId) { + mgmtAddToShellQueue(param); +} + +void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { + void *unUsed = NULL; + taosTmrReset(mgmtDoDealyedAddToShellQueue, 1000, queuedMsg, tsMgmtTmr, &unUsed); +} + static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { if (rpcMsg == NULL || rpcMsg->pCont == NULL) { return; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index a14bdd058f..b536bb5ac9 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -540,7 +540,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; pMsg->pTable = mgmtGetTable(pCreate->tableId); - if (pMsg->pTable != NULL) { + if (pMsg->pTable != NULL && pMsg->retry == 0) { if (pCreate->igExists) { mTrace("table:%s, is already exist", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); @@ -1300,7 +1300,11 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { return; } - pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); + if (pMsg->retry == 0) { + pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); + } else { + pMsg->pTable = mgmtGetTable(pCreate->tableId); + } if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -1315,6 +1319,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pMsg->pTable; + newMsg->maxRetry = 5; mgmtIncTableRef(pMsg->pTable); SRpcMsg rpcMsg = { .handle = newMsg, @@ -1737,30 +1742,40 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { queueMsg->received++; SChildTableObj *pTable = queueMsg->ahandle; - mTrace("table:%s, create table rsp received, thandle:%p ahandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, - rpcMsg->handle, tstrerror(rpcMsg->code)); + mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, + tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { - SSdbOperDesc oper = { - .type = SDB_OPER_TYPE_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable - }; - sdbDeleteRow(&oper); + if (queueMsg->retry++ < queueMsg->maxRetry) { + mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId, + queueMsg->retry, queueMsg->thandle, tstrerror(rpcMsg->code)); + mgmtDealyedAddToShellQueue(queueMsg); + } else { + mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId, + queueMsg->thandle, tstrerror(rpcMsg->code)); + + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsChildTableSdb, + .pObj = pTable + }; + sdbDeleteRow(&oper); - mError("table:%s, failed to create in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code)); - mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + mgmtFreeQueuedMsg(queueMsg); + } } else { - mTrace("table:%s, created in dnode", pTable->info.tableId); + mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, + tstrerror(rpcMsg->code)); + if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) { mTrace("table:%s, start to get meta", pTable->info.tableId); - mgmtAddToShellQueue(mgmtCloneQueuedMsg(queueMsg)); + mgmtAddToShellQueue(queueMsg); } else { mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + mgmtFreeQueuedMsg(queueMsg); } } - - mgmtFreeQueuedMsg(queueMsg); } // not implemented yet diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 4c969124a0..6b27fbbc83 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -18,6 +18,7 @@ #include "taoserror.h" #include "tlog.h" #include "tbalance.h" +#include "tsync.h" #include "tcluster.h" #include "mnode.h" #include "mgmtDb.h" @@ -209,6 +210,18 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) { mgmtSendCreateVgroupMsg(pVgroup, NULL); } +void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload) { + if (pVload->role == TAOS_SYNC_ROLE_MASTER) { + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; + if (pVgid->dnodeId == dnodeId) { + pVgroup->inUse = i; + break; + } + } + } +} + SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { return pDb->pHead; } diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 182b4b6257..8db00ed1b1 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -285,7 +285,6 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; pLoad->vgId = htonl(pVnode->vgId); - pLoad->vnode = htonl(pVnode->vgId); pLoad->status = pVnode->status; pLoad->role = pVnode->role; } diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/main/src/vnodeWrite.c index 5e03305487..6f06585fa9 100644 --- a/src/vnode/main/src/vnodeWrite.c +++ b/src/vnode/main/src/vnodeWrite.c @@ -57,7 +57,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { if (pVnode->status != TAOS_VN_STATUS_READY) return TSDB_CODE_NOT_ACTIVE_VNODE; - // if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) + if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) + return TSDB_CODE_NO_MASTER; // assign version pVnode->version++; diff --git a/tests/script/tmp/prepare.sim b/tests/script/tmp/prepare.sim index 1f0b893e6d..731b707434 100644 --- a/tests/script/tmp/prepare.sim +++ b/tests/script/tmp/prepare.sim @@ -1,3 +1,4 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 -system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 \ No newline at end of file +system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 +system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3 \ No newline at end of file -- GitLab