未验证 提交 fa3356d3 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19698 from taosdata/enh/TD-18702

enh: split vgroup
......@@ -1279,6 +1279,25 @@ typedef struct {
int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
typedef struct {
int32_t vgId;
int8_t disable;
} SDisableVnodeWriteReq;
int32_t tSerializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
int32_t tDeserializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
typedef struct {
int32_t srcVgId;
int32_t dstVgId;
uint32_t hashBegin;
uint32_t hashEnd;
int64_t reserved;
} SAlterVnodeHashRangeReq;
int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq);
int32_t tDeserializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq);
typedef struct {
SMsgHead header;
char dbFName[TSDB_DB_FNAME_LEN];
......
......@@ -220,6 +220,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
......
......@@ -150,7 +150,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname);
* @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRename(STfs *pTfs, char *orname, char *nrname);
int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname);
/**
* @brief Init file object in tfs.
......
......@@ -4118,6 +4118,68 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode
return 0;
}
int32_t tSerializeSDisableVnodeWriteReq(void *buf, int32_t bufLen, SDisableVnodeWriteReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->disable) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDisableVnodeWriteReq(void *buf, int32_t bufLen, SDisableVnodeWriteReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->disable) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnodeHashRangeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1;
if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1;
if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnodeHashRangeReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->srcVgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......
......@@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
......
......@@ -54,6 +54,7 @@ typedef struct {
int32_t vgVersion;
int32_t refCount;
int8_t dropped;
int8_t disable;
char *path;
SVnode *pImpl;
SMultiWorker pWriteW;
......@@ -80,13 +81,15 @@ typedef struct {
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal);
// vmHandle.c
SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
......
......@@ -281,7 +281,94 @@ _OVER:
return code;
}
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SDisableVnodeWriteReq req = {0};
if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
pVnode->disable = req.disable;
vmReleaseVnode(pMgmt, pVnode);
return 0;
}
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeHashRangeReq req = {0};
if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t srcVgId = req.srcVgId;
int32_t dstVgId = req.dstVgId;
dInfo("vgId:%d, start to alter vnode hashrange[%u, %u), dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
req.dstVgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
SWrapperCfg wrapperCfg = {
.dropped = pVnode->dropped,
.vgId = dstVgId,
.vgVersion = pVnode->vgVersion,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
dInfo("vgId:%d, close vnode", srcVgId);
vmCloseVnode(pMgmt, pVnode, true);
char srcPath[TSDB_FILENAME_LEN] = {0};
char dstPath[TSDB_FILENAME_LEN] = {0};
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
return -1;
}
dInfo("vgId:%d, start to open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
return -1;
}
if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
dError("vgId:%d, failed to open vnode mgmt since %s", dstVgId, terrstr());
return -1;
}
if (vnodeStart(pImpl) != 0) {
dError("vgId:%d, failed to start sync since %s", dstVgId, terrstr());
return -1;
}
if (vmWriteVnodeListToFile(pMgmt) != 0) {
dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
return -1;
}
dInfo("vgId:%d, vnode hashrange is altered", dstVgId);
return 0;
}
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeReplicaReq alterReq = {0};
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -289,16 +376,16 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
int32_t vgId = alterReq.vgId;
dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", alterReq.vgId, alterReq.replica,
alterReq.selfIndex, alterReq.strict);
dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", vgId, alterReq.replica, alterReq.selfIndex,
alterReq.strict);
for (int32_t i = 0; i < alterReq.replica; ++i) {
SReplica *pReplica = &alterReq.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", alterReq.vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
}
if (alterReq.replica <= 0 || alterReq.selfIndex < 0 || alterReq.selfIndex >= alterReq.replica) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", alterReq.vgId);
dError("vgId:%d, failed to alter replica since invalid msg", vgId);
return -1;
}
......@@ -306,7 +393,7 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", alterReq.vgId, pReplica->id, pReplica->fqdn,
dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
pReplica->port);
return -1;
}
......@@ -325,13 +412,13 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.vgVersion = pVnode->vgVersion,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
vmCloseVnode(pMgmt, pVnode);
vmCloseVnode(pMgmt, pVnode, false);
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlter(path, &alterReq, pMgmt->pTfs) < 0) {
if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1;
}
......@@ -387,7 +474,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
vmCloseVnode(pMgmt, pVnode);
vmCloseVnode(pMgmt, pVnode, false);
vmWriteVnodeListToFile(pMgmt);
dInfo("vgId:%d, is dropped", vgId);
......@@ -451,7 +538,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
......
......@@ -76,7 +76,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
return code;
}
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
char path[TSDB_FILENAME_LEN] = {0};
vnodeProposeCommitOnNeed(pVnode->pImpl);
......@@ -124,10 +124,26 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
vnodePostClose(pVnode->pImpl);
vmFreeQueue(pMgmt, pVnode);
if (commitAndRemoveWal) {
dInfo("vgId:%d, commit data", pVnode->vgId);
vnodeSyncCommit(pVnode->pImpl);
vnodeBegin(pVnode->pImpl);
dInfo("vgId:%d, commit data finished", pVnode->vgId);
}
vnodeClose(pVnode->pImpl);
pVnode->pImpl = NULL;
dInfo("vgId:%d, vnode is closed", pVnode->vgId);
if (commitAndRemoveWal) {
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
tfsRmdir(pMgmt->pTfs, path);
tfsMkdir(pMgmt->pTfs, path);
}
if (pVnode->dropped) {
dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
......@@ -257,7 +273,7 @@ static void *vmCloseVnodeInThread(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
tmsgReportStartup("vnode-close", stepDesc);
vmCloseVnode(pMgmt, pVnode);
vmCloseVnode(pMgmt, pVnode, false);
}
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
......
......@@ -41,7 +41,13 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
code = vmProcessDropVnodeReq(pMgmt, pMsg);
break;
case TDMT_VND_ALTER_REPLICA:
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
code = vmProcessAlterVnodeReplicaReq(pMgmt, pMsg);
break;
case TDMT_VND_DISABLE_WRITE:
code = vmProcessDisableVnodeWriteReq(pMgmt, pMsg);
break;
case TDMT_VND_ALTER_HASHRANGE:
code = vmProcessAlterHashRangeReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
......@@ -191,14 +197,21 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
terrno = TSDB_CODE_NO_DISKSPACE;
code = terrno;
dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
break;
}
if (pMsg->msgType == TDMT_VND_SUBMIT && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else {
break;
}
if (pMsg->msgType != TDMT_VND_ALTER_CONFIRM && pVnode->disable) {
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since its disable", pVnode->vgId, pMsg);
terrno = TSDB_CODE_VND_STOPPED;
break;
}
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteW.queue, pMsg);
}
break;
case SYNC_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
......
......@@ -39,7 +39,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed since no handler", pMsg);
dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
return -1;
}
......
......@@ -30,6 +30,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
SSdbRaw *mndDbActionEncode(SDbObj *pDb);
const char *mndGetDbStr(const char *src);
#ifdef __cplusplus
......
......@@ -32,7 +32,6 @@
#define DB_VER_NUMBER 1
#define DB_RESERVE_SIZE 54
static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
......@@ -74,7 +73,7 @@ int32_t mndInitDb(SMnode *pMnode) {
void mndCleanupDb(SMnode *pMnode) {}
static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SDbObj) + pDb->cfg.numOfRetensions * sizeof(SRetention) + DB_RESERVE_SIZE;
......@@ -259,6 +258,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
pOld->updateTime = pNew->updateTime;
pOld->cfgVersion = pNew->cfgVersion;
pOld->vgVersion = pNew->vgVersion;
pOld->cfg.numOfVgroups = pNew->cfg.numOfVgroups;
pOld->cfg.buffer = pNew->cfg.buffer;
pOld->cfg.pageSize = pNew->cfg.pageSize;
pOld->cfg.pages = pNew->cfg.pages;
......
......@@ -59,6 +59,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
......@@ -355,9 +356,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
SReplica *pReplica = &alterReq.replicas[v];
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
return NULL;
}
if (pVgidDnode == NULL) return NULL;
pReplica->id = pVgidDnode->id;
pReplica->port = pVgidDnode->port;
......@@ -397,6 +396,57 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
return pReq;
}
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
SDisableVnodeWriteReq disableReq = {
.vgId = vgId,
.disable = 1,
};
mInfo("vgId:%d, build disable vnode write req", vgId);
int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq);
*pContLen = contLen;
return pReq;
}
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dstVgId, int32_t *pContLen) {
SAlterVnodeHashRangeReq alterReq = {
.srcVgId = pVgroup->vgId,
.dstVgId = dstVgId,
.hashBegin = pVgroup->hashBegin,
.hashEnd = pVgroup->hashEnd,
};
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, begin:%u, end:%u", pVgroup->vgId, dstVgId,
pVgroup->hashBegin, pVgroup->hashEnd);
int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq);
*pContLen = contLen;
return pReq;
}
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SDropVnodeReq dropReq = {0};
dropReq.dnodeId = pDnode->id;
......@@ -1029,6 +1079,7 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
mInfo("vgId:%d, build alter vnode confirm req", pVgroup->vgId);
int32_t contLen = sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
......@@ -1053,7 +1104,25 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
return 0;
}
int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { return 0; }
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dstVgId) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, pVgroup, dstVgId, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_HASHRANGE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
STransAction action = {0};
......@@ -1099,6 +1168,30 @@ int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
return 0;
}
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
if (pDnode == NULL) return -1;
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_DISABLE_WRITE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool isRedo) {
STransAction action = {0};
......@@ -1765,6 +1858,8 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1;
STrans *pTrans = NULL;
SSdbRaw *pRaw = NULL;
SDbObj dbObj = {0};
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "split-vgroup");
......@@ -1784,18 +1879,21 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId) != 0) goto _OVER;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
} else if (newVg1.replica == 3) {
SVnodeGid del1 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1) != 0) goto _OVER;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
} else {
goto _OVER;
}
for (int32_t i = 0; i < newVg1.replica; ++i) {
if (mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId) != 0) goto _OVER;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
mInfo("vgId:%d, vgroup info after adjust replica, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId,
newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
......@@ -1815,13 +1913,23 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId,
newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
}
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg2.vgId,
newVg2.replica, newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
}
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER;
newVg1.vgId = maxVgId;
maxVgId++;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER;
newVg2.vgId = maxVgId;
#if 0
// adjust vgroup replica
if (pDb->cfg.replications != newVg1.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
......@@ -1829,38 +1937,38 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
}
#endif
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg1);
if (pRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
pRaw = mndVgroupActionEncode(&newVg1);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
}
pRaw = NULL;
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg2);
if (pRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
pRaw = mndVgroupActionEncode(&newVg2);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
}
pRaw = NULL;
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId,
newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
}
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg2.vgId,
newVg2.replica, newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
}
pRaw = mndVgroupActionEncode(pVgroup);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
pRaw = NULL;
memcpy(&dbObj, pDb, sizeof(SDbObj));
if (dbObj.cfg.pRetensions != NULL) {
dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
if (dbObj.cfg.pRetensions == NULL) goto _OVER;
}
dbObj.vgVersion++;
dbObj.updateTime = taosGetTimestampMs();
dbObj.cfg.numOfVgroups++;
pRaw = mndDbActionEncode(&dbObj);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......@@ -1868,22 +1976,29 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
_OVER:
taosArrayDestroy(pArray);
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
taosArrayDestroy(dbObj.cfg.pRetensions);
return code;
}
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
int32_t vgId = 2;
SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL;
mInfo("vgId:%d, start to split", vgId);
SSplitVgroupReq req = {0};
if (tDeserializeSSplitVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mInfo("vgId:%d, start to split", req.vgId);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) {
goto _OVER;
}
pVgroup = mndAcquireVgroup(pMnode, vgId);
pVgroup = mndAcquireVgroup(pMnode, req.vgId);
if (pVgroup == NULL) goto _OVER;
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
......
......@@ -50,13 +50,16 @@ extern const SVnodeCfg vnodeCfgDefault;
int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);
void vnodeClose(SVnode *pVnode);
int32_t vnodeSyncCommit(SVnode *pVnode);
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
......
......@@ -56,4 +56,7 @@ int metaPrepareAsyncCommit(SMeta *pMeta) {
}
// abort the meta txn
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, pMeta->txn); }
int metaAbort(SMeta *pMeta) {
if (!pMeta->txn) return 0;
return tdbAbort(pMeta->pEnv, pMeta->txn);
}
......@@ -203,7 +203,7 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pEnv) tdbAbort(pMeta->pEnv, pMeta->txn);
if (pMeta->pEnv) metaAbort(pMeta);
if (pMeta->pCache) metaCacheClose(pMeta);
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
......
......@@ -219,10 +219,10 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
if (pPool->node.size != size) {
SVBufPool *pNewPool = NULL;
if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) {
vWarn("vgId:%d failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s",
vWarn("vgId:%d, failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s",
TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno));
} else {
vInfo("vgId:%d buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id,
vInfo("vgId:%d, buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id,
pPool->node.size, size);
vnodeBufPoolDestroy(pPool);
......@@ -232,7 +232,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
}
// add to free list
vDebug("vgId:%d buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
vDebug("vgId:%d, buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
vnodeBufPoolReset(pPool);
pPool->freeNext = pVnode->freeList;
pVnode->freeList = pPool;
......@@ -307,7 +307,7 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
SVnode *pVnode = pPool->pVnode;
vDebug("vgId:%d recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
taosThreadMutexLock(&pPool->mutex);
......
......@@ -28,10 +28,10 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
if (pVnode->onRecycle == NULL) {
if (pVnode->recycleHead == NULL) {
vDebug("vgId:%d no recyclable buffer pool", TD_VID(pVnode));
vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode));
goto _exit;
} else {
vDebug("vgId:%d buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
pVnode->recycleHead->id);
pVnode->onRecycle = pVnode->recycleHead;
......@@ -50,7 +50,7 @@ static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
_exit:
if (code) {
vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
}
return code;
}
......@@ -65,7 +65,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
++nTry;
if (pVnode->freeList) {
vDebug("vgId:%d allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
pVnode->freeList->id);
pVnode->inUse = pVnode->freeList;
......@@ -74,13 +74,13 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
pVnode->inUse->freeNext = NULL;
break;
} else {
vDebug("vgId:%d no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
code = vnodeTryRecycleBufPool(pVnode);
TSDB_CHECK_CODE(code, lino, _exit);
if (pVnode->freeList == NULL) {
vDebug("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
struct timeval tv;
struct timespec ts;
......@@ -105,7 +105,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
_exit:
taosThreadMutexUnlock(&pVnode->mutex);
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -140,7 +140,7 @@ int vnodeBegin(SVnode *pVnode) {
_exit:
if (code) {
terrno = code;
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
return code;
}
......@@ -351,7 +351,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) {
if (nRef == 0) {
vnodeBufPoolAddToFreeList(pPool);
} else if (nRef > 0) {
vDebug("vgId:%d buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
if (pVnode->recycleTail == NULL) {
pPool->recyclePrev = pPool->recycleNext = NULL;
......
......@@ -58,7 +58,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return 0;
}
int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0;
......@@ -107,6 +107,117 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
return 0;
}
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
int32_t ret = tfsRename(pTfs, srcPath, dstPath);
if (ret != 0) return ret;
char oldRname[TSDB_FILENAME_LEN] = {0};
char newRname[TSDB_FILENAME_LEN] = {0};
char tsdbPath[TSDB_FILENAME_LEN] = {0};
char tsdbFilePrefix[TSDB_FILENAME_LEN] = {0};
snprintf(tsdbPath, TSDB_FILENAME_LEN, "%s%stsdb", dstPath, TD_DIRSEP);
snprintf(tsdbFilePrefix, TSDB_FILENAME_LEN, "tsdb%sv", TD_DIRSEP);
STfsDir *tsdbDir = tfsOpendir(pTfs, tsdbPath);
if (tsdbDir == NULL) return 0;
while (1) {
const STfsFile *tsdbFile = tfsReaddir(tsdbDir);
if (tsdbFile == NULL) break;
if (tsdbFile->rname == NULL) continue;
tstrncpy(oldRname, tsdbFile->rname, TSDB_FILENAME_LEN);
char *tsdbFilePrefixPos = strstr(oldRname, tsdbFilePrefix);
if (tsdbFilePrefixPos == NULL) continue;
int32_t tsdbFileVgId = atoi(tsdbFilePrefixPos + 6);
if (tsdbFileVgId == srcVgId) {
char *tsdbFileSurfixPos = strstr(tsdbFilePrefixPos, "f");
if (tsdbFileSurfixPos == NULL) continue;
tsdbFilePrefixPos[6] = 0;
snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);
ret = tfsRename(pTfs, tsdbFile->rname, newRname);
if (ret != 0) {
vInfo("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
tfsClosedir(tsdbDir);
return ret;
}
}
}
tfsClosedir(tsdbDir);
return 0;
}
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0;
if (pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, srcPath);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", srcPath);
}
// todo add stat file to handle exception while vnode open
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to read vnode config from %s since %s", pReq->srcVgId, srcPath, tstrerror(terrno));
return -1;
}
vInfo("vgId:%d, start to alter hashrange from [%u, %u) to [%u, %u)", pReq->srcVgId, info.config.hashBegin,
info.config.hashEnd, pReq->hashBegin, pReq->hashEnd);
info.config.vgId = pReq->dstVgId;
info.config.hashBegin = pReq->hashBegin;
info.config.hashEnd = pReq->hashEnd;
info.config.walCfg.vgId = pReq->dstVgId;
SSyncCfg *pCfg = &info.config.syncCfg;
pCfg->myIndex = 0;
pCfg->replicaNum = 1;
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
vInfo("vgId:%d, alter vnode replicas to 1", pReq->srcVgId);
SNodeInfo *pNode = &pCfg->nodeInfo[0];
pNode->nodePort = tsServerPort;
tstrncpy(pNode->nodeFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
vInfo("vgId:%d, ep:%s:%u dnode:%d", pReq->srcVgId, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
info.config.syncCfg = *pCfg;
ret = vnodeSaveInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to save vnode config since %s", pReq->dstVgId, tstrerror(terrno));
return -1;
}
ret = vnodeCommitInfo(dir, &info);
if (ret < 0) {
vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
return -1;
}
vInfo("vgId:%d, start to rename %s to %s", pReq->dstVgId, srcPath, dstPath);
ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs);
if (ret < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
tstrerror(terrno));
return -1;
}
// todo vnode compact here
vInfo("vgId:%d, vnode hashrange is altered", info.config.vgId);
return 0;
}
void vnodeDestroy(const char *path, STfs *pTfs) {
vInfo("path:%s is removed while destroy vnode", path);
tfsRmdir(pTfs, path);
......
......@@ -24,7 +24,6 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
......@@ -313,9 +312,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
case TDMT_VND_ALTER_CONFIRM:
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
break;
case TDMT_VND_ALTER_HASHRANGE:
vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
break;
case TDMT_VND_ALTER_CONFIG:
vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
break;
......@@ -1246,16 +1242,6 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
return 0;
}
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
// todo
// 1. stop work
// 2. adjust hash range / compact / remove wals / rename vgroups
// 3. reload sync
return 0;
}
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
bool walChanged = false;
bool tsdbChanged = false;
......
......@@ -445,6 +445,11 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
SPgno journalSize = 0;
int ret;
if (pTxn->jfd == 0) {
// txn is commited
return 0;
}
// sync the journal file
ret = tdbOsFSync(pTxn->jfd);
if (ret < 0) {
......
......@@ -303,7 +303,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
return 0;
}
int32_t tfsRename(STfs *pTfs, char *orname, char *nrname) {
int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname) {
char oaname[TMPNAME_LEN] = "\0";
char naname[TMPNAME_LEN] = "\0";
......
......@@ -11,7 +11,6 @@ system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
sql create user u1 pass 'taosdata'
print =============== step1 create dnode2
sql create dnode $hostname port 7200
......@@ -73,8 +72,21 @@ print =============== step3: create database
sql use d1
sql create table d1.st (ts timestamp, i int) tags (j int)
sql create table d1.c1 using st tags(1)
sql create table d1.c2 using st tags(2)
sql create table d1.c3 using st tags(3)
sql create table d1.c4 using st tags(4)
sql create table d1.c5 using st tags(5)
sql insert into d1.c1 values (now, 1);
sql insert into d1.c2 values (now, 2);
sql insert into d1.c3 values (now, 3);
sql insert into d1.c4 values (now, 4);
sql insert into d1.c5 values (now, 5);
sql show d1.tables
if $rows != 1 then
if $rows != 5 then
return -1
endi
sql select * from d1.st
if $rows != 5 then
return -1
endi
......@@ -82,6 +94,34 @@ print =============== step4: split
print split vgroup 2
sql split vgroup 2
print =============== step5: check split result
sql show d1.tables
#if $rows != 5 then
# return -1
#endi
#sql select * from d1.st
#if $rows != 5 then
# return -1
#endi
print =============== step6: create tables
sql create table d1.c6 using st tags(6)
sql create table d1.c7 using st tags(7)
sql create table d1.c8 using st tags(8)
sql create table d1.c9 using st tags(9)
sql insert into d1.c6 values (now, 6);
sql insert into d1.c7 values (now, 7);
sql insert into d1.c8 values (now, 8);
sql insert into d1.c9 values (now, 9);
sql show d1.tables
#if $rows != 9 then
# return -1
#endi
#sql select * from d1.st
#if $rows != 9 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册