提交 03dc6e55 编写于 作者: S Shengliang Guan

enh: add disable vnode write request

上级 f36713a5
......@@ -1279,6 +1279,14 @@ typedef struct {
int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
typedef struct {
int32_t vgId;
int8_t disable;
} SDisableVnodeWriteReq;
int32_t tSerializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
int32_t tDeserializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
typedef struct {
SMsgHead header;
char dbFName[TSDB_DB_FNAME_LEN];
......
......@@ -220,6 +220,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
......
......@@ -4118,6 +4118,34 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode
return 0;
}
int32_t tSerializeSDisableVnodeWriteReq(void *buf, int32_t bufLen, SDisableVnodeWriteReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->disable) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDisableVnodeWriteReq(void *buf, int32_t bufLen, SDisableVnodeWriteReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->disable) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......
......@@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
......
......@@ -54,6 +54,7 @@ typedef struct {
int32_t vgVersion;
int32_t refCount;
int8_t dropped;
int8_t disable;
char *path;
SVnode *pImpl;
SMultiWorker pWriteW;
......@@ -87,6 +88,7 @@ SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
......
......@@ -281,6 +281,27 @@ _OVER:
return code;
}
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SDisableVnodeWriteReq req = {0};
if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dInfo("vgId:%d, vnode write disable:%d", req.vgId, req.disable);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to disable write since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
pVnode->disable = req.disable;
vmReleaseVnode(pMgmt, pVnode);
return 0;
}
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeReplicaReq alterReq = {0};
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
......@@ -289,16 +310,16 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
int32_t vgId = alterReq.vgId;
dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", alterReq.vgId, alterReq.replica,
alterReq.selfIndex, alterReq.strict);
dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", vgId, alterReq.replica, alterReq.selfIndex,
alterReq.strict);
for (int32_t i = 0; i < alterReq.replica; ++i) {
SReplica *pReplica = &alterReq.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", alterReq.vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
}
if (alterReq.replica <= 0 || alterReq.selfIndex < 0 || alterReq.selfIndex >= alterReq.replica) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", alterReq.vgId);
dError("vgId:%d, failed to alter replica since invalid msg", vgId);
return -1;
}
......@@ -306,7 +327,7 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", alterReq.vgId, pReplica->id, pReplica->fqdn,
dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
pReplica->port);
return -1;
}
......@@ -451,6 +472,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -43,6 +43,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_VND_ALTER_REPLICA:
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
break;
case TDMT_VND_DISABLE_WRITE:
code = vmProcessDisableVnodeWriteReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
......@@ -191,14 +194,21 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
terrno = TSDB_CODE_NO_DISKSPACE;
code = terrno;
dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
break;
}
if (pMsg->msgType == TDMT_VND_SUBMIT && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteW.queue, pMsg);
break;
}
if (pMsg->msgType != TDMT_VND_ALTER_CONFIRM && pVnode->disable) {
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since its disable", pVnode->vgId, pMsg);
terrno = TSDB_CODE_VND_STOPPED;
break;
}
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteW.queue, pMsg);
break;
case SYNC_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
......
......@@ -39,7 +39,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed since no handler", pMsg);
dGError("msg:%p, not processed since no handler, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
return -1;
}
......
......@@ -59,6 +59,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
......@@ -355,9 +356,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
SReplica *pReplica = &alterReq.replicas[v];
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
return NULL;
}
if (pVgidDnode == NULL) return NULL;
pReplica->id = pVgidDnode->id;
pReplica->port = pVgidDnode->port;
......@@ -397,6 +396,30 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
return pReq;
}
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
SDisableVnodeWriteReq disableReq = {
.vgId = vgId,
.disable = 1,
};
mInfo("vgId:%d, build disable vnode write req", vgId);
int32_t contLen = tSerializeSDisableVnodeWriteReq(NULL, 0, &disableReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSDisableVnodeWriteReq(pReq, contLen, &disableReq);
*pContLen = contLen;
return pReq;
}
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
SDropVnodeReq dropReq = {0};
dropReq.dnodeId = pDnode->id;
......@@ -1029,6 +1052,7 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
mInfo("vgId:%d, build alter vnode confirm req", pVgroup->vgId);
int32_t contLen = sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
......@@ -1099,6 +1123,30 @@ int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
return 0;
}
static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
if (pDnode == NULL) return -1;
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildDisableVnodeWriteReq(pMnode, pDb, pVgroup->vgId, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_DISABLE_WRITE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool isRedo) {
STransAction action = {0};
......@@ -1785,18 +1833,21 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg1, pArray) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId) != 0) goto _OVER;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
} else if (newVg1.replica == 3) {
SVnodeGid del1 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVg1, pArray, &del1) != 0) goto _OVER;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[0].dnodeId) != 0) goto _OVER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[1].dnodeId) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
} else {
goto _OVER;
}
for (int32_t i = 0; i < newVg1.replica; ++i) {
if (mndAddDisableVnodeWriteAction(pMnode, pTrans, pDb, &newVg1, newVg1.vnodeGid[i].dnodeId) != 0) goto _OVER;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
mInfo("vgId:%d, vgroup info after adjust replica, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId,
newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
......@@ -1822,8 +1873,14 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId,
newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
}
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg2.vgId,
newVg2.replica, newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
}
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
......@@ -1854,17 +1911,6 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
(void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
pRaw = NULL;
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg1.vgId,
newVg1.replica, newVg1.hashBegin, newVg1.hashEnd, newVg1.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
}
mInfo("vgId:%d, vgroup info after adjust hash, replica:%d hashBegin:%u hashEnd:%u vnode:0 dnode:%d", newVg2.vgId,
newVg2.replica, newVg2.hashBegin, newVg2.hashEnd, newVg2.vnodeGid[0].dnodeId);
for (int32_t i = 0; i < newVg1.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册