diff --git a/include/common/tmsg.h b/include/common/tmsg.h index caf67ee3a99cf054a7b6add325fbd1af7b5e8b3f..c0e8cbae49a78a66da7e091920bc944e06dd9584 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1279,6 +1279,25 @@ typedef struct { int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq); int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq); +typedef struct { + int32_t vgId; + int8_t disable; +} SDisableVnodeWriteReq; + +int32_t tSerializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq); +int32_t tDeserializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq); + +typedef struct { + int32_t srcVgId; + int32_t dstVgId; + uint32_t hashBegin; + uint32_t hashEnd; + int64_t reserved; +} SAlterVnodeHashRangeReq; + +int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq); +int32_t tDeserializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq); + typedef struct { SMsgHead header; char dbFName[TSDB_DB_FNAME_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 7833bdf1393c72b988b1553d691e7d8936b14603..44ee26ef240ddc5f133c08d3076e068d876e7268 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -220,6 +220,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SCH_MSG) diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 3af75e0eaf443e7887dacd6a3056be4241ec5c9b..cbf1d60e35ac1c4187ddb0a5d8cc366714cbaec3 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -150,7 +150,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname); * @param nrname The rel name of new file. * @return int32_t 0 for success, -1 for failure. */ -int32_t tfsRename(STfs *pTfs, char *orname, char *nrname); +int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname); /** * @brief Init file object in tfs. diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 95f74da8034eb70fff4c09802e55e6f754b79323..322143b71f68614bff9a97a262345754fe04ee8e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4118,6 +4118,68 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode return 0; } +int32_t tSerializeSDisableVnodeWriteReq(void *buf, int32_t bufLen, SDisableVnodeWriteReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; + if (tEncodeI8(&encoder, pReq->disable) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSDisableVnodeWriteReq(void *buf, int32_t bufLen, SDisableVnodeWriteReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->disable) < 0) return -1; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnodeHashRangeReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->srcVgId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1; + if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1; + if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnodeHashRangeReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->srcVgId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 16fe6c1b91f8a7788ad4a48993b337fe6bc7b160..d441d3d187c550fb4c62ab8205f5823f9c80363d 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 6e724f4d4322c3aee7a143b1cffaa4cae155d3e7..e3fa2964b74b697cd507a9fddcb27a848010e2e1 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -54,6 +54,7 @@ typedef struct { int32_t vgVersion; int32_t refCount; int8_t dropped; + int8_t disable; char *path; SVnode *pImpl; SMultiWorker pWriteW; @@ -80,13 +81,15 @@ typedef struct { SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId); void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl); -void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); +void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal); // vmHandle.c SArray *vmGetMsgHandles(); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); // vmFile.c int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 47772acbdce512a73f24977edb08ad62e0ccef79..1c1b8e32cd25393280ff1986cb39a058accc165e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -281,7 +281,94 @@ _OVER: return code; } -int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { +int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SDisableVnodeWriteReq req = {0}; + if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable); + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); + if (pVnode == NULL) { + dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr()); + terrno = TSDB_CODE_VND_NOT_EXIST; + return -1; + } + + pVnode->disable = req.disable; + vmReleaseVnode(pMgmt, pVnode); + return 0; +} + +int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SAlterVnodeHashRangeReq req = {0}; + if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int32_t srcVgId = req.srcVgId; + int32_t dstVgId = req.dstVgId; + dInfo("vgId:%d, start to alter vnode hashrange[%u, %u), dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd, + req.dstVgId); + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId); + if (pVnode == NULL) { + dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr()); + terrno = TSDB_CODE_VND_NOT_EXIST; + return -1; + } + + SWrapperCfg wrapperCfg = { + .dropped = pVnode->dropped, + .vgId = dstVgId, + .vgVersion = pVnode->vgVersion, + }; + tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); + + dInfo("vgId:%d, close vnode", srcVgId); + vmCloseVnode(pMgmt, pVnode, true); + + char srcPath[TSDB_FILENAME_LEN] = {0}; + char dstPath[TSDB_FILENAME_LEN] = {0}; + snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId); + snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId); + + dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath); + if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) { + dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr()); + return -1; + } + + dInfo("vgId:%d, start to open vnode", dstVgId); + SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb); + if (pImpl == NULL) { + dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr()); + return -1; + } + + if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) { + dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr()); + return -1; + } + + if (vnodeStart(pImpl) != 0) { + dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr()); + return -1; + } + + if (vmWriteVnodeListToFile(pMgmt) != 0) { + dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr()); + return -1; + } + + dInfo("vgId:%d, vnode hashrange is altered", dstVgId); + return 0; +} + +int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SAlterVnodeReplicaReq alterReq = {0}; if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -289,16 +376,16 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t vgId = alterReq.vgId; - dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", alterReq.vgId, alterReq.replica, - alterReq.selfIndex, alterReq.strict); + dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", vgId, alterReq.replica, alterReq.selfIndex, + alterReq.strict); for (int32_t i = 0; i < alterReq.replica; ++i) { SReplica *pReplica = &alterReq.replicas[i]; - dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", alterReq.vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); + dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); } if (alterReq.replica <= 0 || alterReq.selfIndex < 0 || alterReq.selfIndex >= alterReq.replica) { terrno = TSDB_CODE_INVALID_MSG; - dError("vgId:%d, failed to alter replica since invalid msg", alterReq.vgId); + dError("vgId:%d, failed to alter replica since invalid msg", vgId); return -1; } @@ -306,7 +393,7 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort || strcmp(pReplica->fqdn, tsLocalFqdn) != 0) { terrno = TSDB_CODE_INVALID_MSG; - dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", alterReq.vgId, pReplica->id, pReplica->fqdn, + dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn, pReplica->port); return -1; } @@ -325,13 +412,13 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { .vgVersion = pVnode->vgVersion, }; tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); - vmCloseVnode(pMgmt, pVnode); + vmCloseVnode(pMgmt, pVnode, false); char path[TSDB_FILENAME_LEN] = {0}; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); - if (vnodeAlter(path, &alterReq, pMgmt->pTfs) < 0) { + if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) { dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr()); return -1; } @@ -387,7 +474,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - vmCloseVnode(pMgmt, pVnode); + vmCloseVnode(pMgmt, pVnode, false); vmWriteVnodeListToFile(pMgmt); dInfo("vgId:%d, is dropped", vgId); @@ -451,7 +538,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 99ba9b9b3bcc1e0efe9d1a1bb987e01c89786374..a0c1754e82fffbc66beab0946efa012a4fcb0d6a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -76,7 +76,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return code; } -void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { +void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) { char path[TSDB_FILENAME_LEN] = {0}; vnodeProposeCommitOnNeed(pVnode->pImpl); @@ -124,10 +124,26 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { vnodePostClose(pVnode->pImpl); vmFreeQueue(pMgmt, pVnode); + + if (commitAndRemoveWal) { + dInfo("vgId:%d, commit data", pVnode->vgId); + vnodeSyncCommit(pVnode->pImpl); + vnodeBegin(pVnode->pImpl); + dInfo("vgId:%d, commit data finished", pVnode->vgId); + } + vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; dInfo("vgId:%d, vnode is closed", pVnode->vgId); + if (commitAndRemoveWal) { + char path[TSDB_FILENAME_LEN] = {0}; + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP); + dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path); + tfsRmdir(pMgmt->pTfs, path); + tfsMkdir(pMgmt->pTfs, path); + } + if (pVnode->dropped) { dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId); @@ -257,7 +273,7 @@ static void *vmCloseVnodeInThread(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); tmsgReportStartup("vnode-close", stepDesc); - vmCloseVnode(pMgmt, pVnode); + vmCloseVnode(pMgmt, pVnode, false); } dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index cd29b115508ccfcd51d9c130c4847b2ef265dba5..600dd3671def2e684a075fc48c351112db1224b5 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -41,7 +41,13 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { code = vmProcessDropVnodeReq(pMgmt, pMsg); break; case TDMT_VND_ALTER_REPLICA: - code = vmProcessAlterVnodeReq(pMgmt, pMsg); + code = vmProcessAlterVnodeReplicaReq(pMgmt, pMsg); + break; + case TDMT_VND_DISABLE_WRITE: + code = vmProcessDisableVnodeWriteReq(pMgmt, pMsg); + break; + case TDMT_VND_ALTER_HASHRANGE: + code = vmProcessAlterHashRangeReq(pMgmt, pMsg); break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; @@ -191,14 +197,21 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp terrno = TSDB_CODE_NO_DISKSPACE; code = terrno; dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); - } else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { + break; + } + if (pMsg->msgType == TDMT_VND_SUBMIT && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) { terrno = TSDB_CODE_VND_NO_WRITE_AUTH; code = terrno; dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); - } else { - dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pWriteW.queue, pMsg); + break; + } + if (pMsg->msgType != TDMT_VND_ALTER_CONFIRM && pVnode->disable) { + dDebug("vgId:%d, msg:%p put into vnode-write queue failed since its disable", pVnode->vgId, pMsg); + terrno = TSDB_CODE_VND_STOPPED; + break; } + dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); + taosWriteQitem(pVnode->pWriteW.queue, pMsg); break; case SYNC_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dcb63f65246cfc103790c2e46b2ca4c8bd4beafe..d2b9618c604196a43f31a8dafccc450ef3152c10 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -39,7 +39,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; if (msgFp == NULL) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dGError("msg:%p, not processed since no handler", pMsg); + dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); return -1; } diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index cea0a43b61bf53cd1bcc47cae52000295e80fd90..9edfd9bf3b2e35c30639ad69ee84e3b1446271e3 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -30,6 +30,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq); bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb); +SSdbRaw *mndDbActionEncode(SDbObj *pDb); const char *mndGetDbStr(const char *src); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index bdfda14a32772644a476b6ad6cbdc7d0e3989795..ea1d3281d40352a2e5bb4c7b30b823c6c60eb8c8 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -32,7 +32,6 @@ #define DB_VER_NUMBER 1 #define DB_RESERVE_SIZE 54 -static SSdbRaw *mndDbActionEncode(SDbObj *pDb); static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); @@ -74,7 +73,7 @@ int32_t mndInitDb(SMnode *pMnode) { void mndCleanupDb(SMnode *pMnode) {} -static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { +SSdbRaw *mndDbActionEncode(SDbObj *pDb) { terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t size = sizeof(SDbObj) + pDb->cfg.numOfRetensions * sizeof(SRetention) + DB_RESERVE_SIZE; @@ -259,6 +258,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->cfgVersion = pNew->cfgVersion; pOld->vgVersion = pNew->vgVersion; + pOld->cfg.numOfVgroups = pNew->cfg.numOfVgroups; pOld->cfg.buffer = pNew->cfg.buffer; pOld->cfg.pageSize = pNew->cfg.pageSize; pOld->cfg.pages = pNew->cfg.pages; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 54ea9e7b2437b303088fcdd9d003eb1aa19af8e5..67780a7369e41a5a6815f17ebb393110b7b29f0b 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -59,6 +59,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg); @@ -355,9 +356,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p SReplica *pReplica = &alterReq.replicas[v]; SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pVgidDnode == NULL) { - return NULL; - } + if (pVgidDnode == NULL) return NULL; pReplica->id = pVgidDnode->id; pReplica->port = pVgidDnode->port; @@ -397,6 +396,57 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p return pReq; } +static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) { + SDisableVnodeWriteReq disableReq = { + .vgId = vgId, + .disable = 1, + }; + + mInfo("vgId:%d, build disable vnode write req", vgId); + int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq); + *pContLen = contLen; + return pReq; +} + +static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dstVgId, int32_t *pContLen) { + SAlterVnodeHashRangeReq alterReq = { + .srcVgId = pVgroup->vgId, + .dstVgId = dstVgId, + .hashBegin = pVgroup->hashBegin, + .hashEnd = pVgroup->hashEnd, + }; + + mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, begin:%u, end:%u", pVgroup->vgId, dstVgId, + pVgroup->hashBegin, pVgroup->hashEnd); + int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq); + *pContLen = contLen; + return pReq; +} + void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { SDropVnodeReq dropReq = {0}; dropReq.dnodeId = pDnode->id; @@ -1029,6 +1079,7 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + mInfo("vgId:%d, build alter vnode confirm req", pVgroup->vgId); int32_t contLen = sizeof(SMsgHead); SMsgHead *pHead = taosMemoryMalloc(contLen); if (pHead == NULL) { @@ -1053,7 +1104,25 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD return 0; } -int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { return 0; } +static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dstVgId) { + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + int32_t contLen = 0; + void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, pVgroup, dstVgId, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_ALTER_HASHRANGE; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { STransAction action = {0}; @@ -1099,6 +1168,30 @@ int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD return 0; } +static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) { + SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); + if (pDnode == NULL) return -1; + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_DISABLE_WRITE; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo) { STransAction action = {0}; @@ -1763,9 +1856,11 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, } static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { - int32_t code = -1; - STrans *pTrans = NULL; - SArray *pArray = mndBuildDnodesArray(pMnode, 0); + int32_t code = -1; + STrans *pTrans = NULL; + SSdbRaw *pRaw = NULL; + SDbObj dbObj = {0}; + SArray *pArray = mndBuildDnodesArray(pMnode, 0); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "split-vgroup"); if (pTrans == NULL) goto _OVER; @@ -1784,18 +1879,21 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray) != 0) goto _OVER; if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId) != 0) goto _OVER; if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]) != 0) goto _OVER; - if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; } else if (newVg1.replica == 3) { SVnodeGid del1 = {0}; if (mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1) != 0) goto _OVER; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER; if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId) != 0) goto _OVER; if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId) != 0) goto _OVER; - if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; } else { goto _OVER; } + for (int32_t i = 0; i < newVg1.replica; ++i) { + if (mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId) != 0) goto _OVER; + } + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; + mInfo("vgId:%d, vgroup info after adjust replica, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId, newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId); for (int32_t i = 0; i < newVg1.replica; ++i) { @@ -1815,13 +1913,23 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId, newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId); + for (int32_t i = 0; i < newVg1.replica; ++i) { + mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId); + } mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg2.vgId, newVg2.replica, newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId); + for (int32_t i = 0; i < newVg1.replica; ++i) { + mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId); + } - if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; - if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER; + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER; + newVg1.vgId = maxVgId; + + maxVgId++; + if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER; + newVg2.vgId = maxVgId; -#if 0 // adjust vgroup replica if (pDb->cfg.replications != newVg1.replica) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER; @@ -1829,38 +1937,38 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj if (pDb->cfg.replications != newVg2.replica) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER; } -#endif - { - SSdbRaw *pRaw = mndVgroupActionEncode(&newVg1); - if (pRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) { - sdbFreeRaw(pRaw); - return -1; - } - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - } + pRaw = mndVgroupActionEncode(&newVg1); + if (pRaw == NULL) goto _OVER; + if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; + (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); + pRaw = NULL; - { - SSdbRaw *pRaw = mndVgroupActionEncode(&newVg2); - if (pRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) { - sdbFreeRaw(pRaw); - return -1; - } - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - } + pRaw = mndVgroupActionEncode(&newVg2); + if (pRaw == NULL) goto _OVER; + if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; + (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); + pRaw = NULL; - mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId, - newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId); - for (int32_t i = 0; i < newVg1.replica; ++i) { - mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId); - } - mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg2.vgId, - newVg2.replica, newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId); - for (int32_t i = 0; i < newVg1.replica; ++i) { - mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId); - } + pRaw = mndVgroupActionEncode(pVgroup); + if (pRaw == NULL) goto _OVER; + if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; + (void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); + pRaw = NULL; + + memcpy(&dbObj, pDb, sizeof(SDbObj)); + if (dbObj.cfg.pRetensions != NULL) { + dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL); + if (dbObj.cfg.pRetensions == NULL) goto _OVER; + } + dbObj.vgVersion++; + dbObj.updateTime = taosGetTimestampMs(); + dbObj.cfg.numOfVgroups++; + pRaw = mndDbActionEncode(&dbObj); + if (pRaw == NULL) goto _OVER; + if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; + (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); + pRaw = NULL; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -1868,22 +1976,29 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj _OVER: taosArrayDestroy(pArray); mndTransDrop(pTrans); + sdbFreeRaw(pRaw); + taosArrayDestroy(dbObj.cfg.pRetensions); return code; } static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; - int32_t vgId = 2; SVgObj *pVgroup = NULL; SDbObj *pDb = NULL; - mInfo("vgId:%d, start to split", vgId); + SSplitVgroupReq req = {0}; + if (tDeserializeSSplitVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + mInfo("vgId:%d, start to split", req.vgId); if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) { goto _OVER; } - pVgroup = mndAcquireVgroup(pMnode, vgId); + pVgroup = mndAcquireVgroup(pMnode, req.vgId); if (pVgroup == NULL) goto _OVER; pDb = mndAcquireDb(pMnode, pVgroup->dbName); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7d368c76c55d61af9305acb23d180cefdc936396..db35a7c08822fed36110de849faff106389b487a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -50,13 +50,16 @@ extern const SVnodeCfg vnodeCfgDefault; int32_t vnodeInit(int32_t nthreads); void vnodeCleanup(); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); -int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); +int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); +int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeClose(SVnode *pVnode); +int32_t vnodeSyncCommit(SVnode *pVnode); +int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index f61930b84cf54b189af0bc36cb66ff69402b27fc..f597c100d0588dae9123af444dc442c0b7de2c2b 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -56,4 +56,7 @@ int metaPrepareAsyncCommit(SMeta *pMeta) { } // abort the meta txn -int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, pMeta->txn); } +int metaAbort(SMeta *pMeta) { + if (!pMeta->txn) return 0; + return tdbAbort(pMeta->pEnv, pMeta->txn); +} diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 609ffc58c3d4ed00c3b5db729179a3559b36e93d..e5bc301fa17928e20434e378fcb16a4bb278ea78 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -203,7 +203,7 @@ _err: int metaClose(SMeta *pMeta) { if (pMeta) { - if (pMeta->pEnv) tdbAbort(pMeta->pEnv, pMeta->txn); + if (pMeta->pEnv) metaAbort(pMeta); if (pMeta->pCache) metaCacheClose(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index b1575fb49642b4b5ee93265f8286c64e433f98fa..40112c55791fe8d2cd3545f57169e2ba7ec33838 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -219,10 +219,10 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { if (pPool->node.size != size) { SVBufPool *pNewPool = NULL; if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) { - vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", + vWarn("vgId:%d, failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s", TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno)); } else { - vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, + vInfo("vgId:%d, buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id, pPool->node.size, size); vnodeBufPoolDestroy(pPool); @@ -232,7 +232,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) { } // add to free list - vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d, buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id); vnodeBufPoolReset(pPool); pPool->freeNext = pVnode->freeList; pVnode->freeList = pPool; @@ -307,7 +307,7 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { SVnode *pVnode = pPool->pVnode; - vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); taosThreadMutexLock(&pPool->mutex); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index aeb90b7080ac0dd0b0e5dd0b4735fc865e842703..a3401926e6d3d55ed7c517f55bd8f5a0b4880d95 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -28,10 +28,10 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { if (pVnode->onRecycle == NULL) { if (pVnode->recycleHead == NULL) { - vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode)); + vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode)); goto _exit; } else { - vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, + vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead, pVnode->recycleHead->id); pVnode->onRecycle = pVnode->recycleHead; @@ -50,7 +50,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) { _exit: if (code) { - vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code)); + vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code)); } return code; } @@ -65,7 +65,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { ++nTry; if (pVnode->freeList) { - vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, + vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList, pVnode->freeList->id); pVnode->inUse = pVnode->freeList; @@ -74,13 +74,13 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { pVnode->inUse->freeNext = NULL; break; } else { - vDebug("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); + vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry); code = vnodeTryRecycleBufPool(pVnode); TSDB_CHECK_CODE(code, lino, _exit); if (pVnode->freeList == NULL) { - vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); + vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC); struct timeval tv; struct timespec ts; @@ -105,7 +105,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) { _exit: taosThreadMutexUnlock(&pVnode->mutex); if (code) { - vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -140,7 +140,7 @@ int vnodeBegin(SVnode *pVnode) { _exit: if (code) { terrno = code; - vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -351,7 +351,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) { if (nRef == 0) { vnodeBufPoolAddToFreeList(pPool); } else if (nRef > 0) { - vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); + vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id); if (pVnode->recycleTail == NULL) { pPool->recyclePrev = pPool->recycleNext = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 61cb75b1da592a4b6d06b3fea21a2bb7b69ae22a..da27b5e4d3a326ada6971e0facbf7af2f155d65a 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -58,7 +58,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { return 0; } -int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { +int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; int32_t ret = 0; @@ -107,6 +107,117 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { return 0; } +int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) { + int32_t ret = tfsRename(pTfs, srcPath, dstPath); + if (ret != 0) return ret; + + char oldRname[TSDB_FILENAME_LEN] = {0}; + char newRname[TSDB_FILENAME_LEN] = {0}; + char tsdbPath[TSDB_FILENAME_LEN] = {0}; + char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0}; + snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", dstPath, TD_DIRSEP); + snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP); + + STfsDir *tsdbDir = tfsOpendir(pTfs, tsdbPath); + if (tsdbDir == NULL) return 0; + + while (1) { + const STfsFile *tsdbFile = tfsReaddir(tsdbDir); + if (tsdbFile == NULL) break; + if (tsdbFile->rname == NULL) continue; + tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN); + + char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix); + if (tsdbFilePrefixPos == NULL) continue; + + int32_t tsdbFileVgId = atoi(tsdbFilePrefixPos + 6); + if (tsdbFileVgId == srcVgId) { + char *tsdbFileSurfixPos = strstr(tsdbFilePrefixPos, "f"); + if (tsdbFileSurfixPos == NULL) continue; + + tsdbFilePrefixPos[6] = 0; + snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos); + vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname); + + ret = tfsRename(pTfs, tsdbFile->rname, newRname); + if (ret != 0) { + vInfo("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr()); + tfsClosedir(tsdbDir); + return ret; + } + } + } + + tfsClosedir(tsdbDir); + return 0; +} + +int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) { + SVnodeInfo info = {0}; + char dir[TSDB_FILENAME_LEN] = {0}; + int32_t ret = 0; + + if (pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, srcPath); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", srcPath); + } + + // todo add stat file to handle exception while vnode open + + ret = vnodeLoadInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno)); + return -1; + } + + vInfo("vgId:%d, start to alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId, info.config.hashBegin, + info.config.hashEnd, pReq->hashBegin, pReq->hashEnd); + info.config.vgId = pReq->dstVgId; + info.config.hashBegin = pReq->hashBegin; + info.config.hashEnd = pReq->hashEnd; + info.config.walCfg.vgId = pReq->dstVgId; + + SSyncCfg *pCfg = &info.config.syncCfg; + pCfg->myIndex = 0; + pCfg->replicaNum = 1; + memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo)); + + vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId); + SNodeInfo *pNode = &pCfg->nodeInfo[0]; + pNode->nodePort = tsServerPort; + tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); + + info.config.syncCfg = *pCfg; + + ret = vnodeSaveInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno)); + return -1; + } + + ret = vnodeCommitInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno)); + return -1; + } + + vInfo("vgId:%d, start to rename %s to %s", pReq->dstVgId, srcPath, dstPath); + ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs); + if (ret < 0) { + vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath, + tstrerror(terrno)); + return -1; + } + + // todo vnode compact here + + vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId); + return 0; +} + void vnodeDestroy(const char *path, STfs *pTfs) { vInfo("path:%s is removed while destroy vnode", path); tfsRmdir(pTfs, path); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6abc144b911720244e33b3553249b98b48550af7..313b9da9a5caf8aa72f7d102f81abacde81c458a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -24,7 +24,6 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -313,9 +312,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp case TDMT_VND_ALTER_CONFIRM: vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp); break; - case TDMT_VND_ALTER_HASHRANGE: - vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp); - break; case TDMT_VND_ALTER_CONFIG: vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp); break; @@ -1246,16 +1242,6 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void return 0; } -static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { - vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode)); - - // todo - // 1. stop work - // 2. adjust hash range / compact / remove wals / rename vgroups - // 3. reload sync - return 0; -} - static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { bool walChanged = false; bool tsdbChanged = false; diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index ced867e938fc66574e70774e0e99000be2bf2fa5..29b7fa740c85e9ea74def8e53d47edd69c76b743 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -445,6 +445,11 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { SPgno journalSize = 0; int ret; + if (pTxn->jfd == 0) { + // txn is commited + return 0; + } + // sync the journal file ret = tdbOsFSync(pTxn->jfd); if (ret < 0) { diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 9a71fc7f30ffebb5cf864d30faea3b8690438112..d73b65c3b5afbb5c6142c7db1e23b6564e196734 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -303,7 +303,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) { return 0; } -int32_t tfsRename(STfs *pTfs, char *orname, char *nrname) { +int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname) { char oaname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0"; diff --git a/tests/script/tsim/dnode/split_vgroup_replica1.sim b/tests/script/tsim/dnode/split_vgroup_replica1.sim index 1bdd322714a1e8e7443d337daacc69d3ce2a4792..51d63d25f482985b67ae37f2d4f005b8010f759e 100644 --- a/tests/script/tsim/dnode/split_vgroup_replica1.sim +++ b/tests/script/tsim/dnode/split_vgroup_replica1.sim @@ -11,7 +11,6 @@ system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s start sql connect -sql create user u1 pass 'taosdata' print =============== step1 create dnode2 sql create dnode $hostname port 7200 @@ -73,8 +72,21 @@ print =============== step3: create database sql use d1 sql create table d1.st (ts timestamp, i int) tags (j int) sql create table d1.c1 using st tags(1) +sql create table d1.c2 using st tags(2) +sql create table d1.c3 using st tags(3) +sql create table d1.c4 using st tags(4) +sql create table d1.c5 using st tags(5) +sql insert into d1.c1 values (now, 1); +sql insert into d1.c2 values (now, 2); +sql insert into d1.c3 values (now, 3); +sql insert into d1.c4 values (now, 4); +sql insert into d1.c5 values (now, 5); sql show d1.tables -if $rows != 1 then +if $rows != 5 then + return -1 +endi +sql select * from d1.st +if $rows != 5 then return -1 endi @@ -82,6 +94,34 @@ print =============== step4: split print split vgroup 2 sql split vgroup 2 +print =============== step5: check split result +sql show d1.tables +#if $rows != 5 then +# return -1 +#endi +#sql select * from d1.st +#if $rows != 5 then +# return -1 +#endi + +print =============== step6: create tables +sql create table d1.c6 using st tags(6) +sql create table d1.c7 using st tags(7) +sql create table d1.c8 using st tags(8) +sql create table d1.c9 using st tags(9) +sql insert into d1.c6 values (now, 6); +sql insert into d1.c7 values (now, 7); +sql insert into d1.c8 values (now, 8); +sql insert into d1.c9 values (now, 9); +sql show d1.tables +#if $rows != 9 then +# return -1 +#endi +#sql select * from d1.st +#if $rows != 9 then +# return -1 +#endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT