提交 980e289c 编写于 作者: S Shengliang Guan

refactor: add alter-confirm while alter db

上级 a4166b73
...@@ -193,13 +193,11 @@ enum { ...@@ -193,13 +193,11 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp) TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_NEW_MSG_SEG(TDMT_QND_MSG) TD_NEW_MSG_SEG(TDMT_QND_MSG)
......
...@@ -213,6 +213,7 @@ SArray *mmGetMsgHandles() { ...@@ -213,6 +213,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
......
...@@ -361,6 +361,7 @@ SArray *vmGetMsgHandles() { ...@@ -361,6 +361,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
......
...@@ -34,7 +34,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); ...@@ -34,7 +34,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SArray *mndBuildDnodesArray(SMnode *pMnode); SArray *mndBuildDnodesArray(SMnode *pMnode);
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray); int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray);
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2); int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pVgId);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby); void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
...@@ -30,25 +30,25 @@ static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj); ...@@ -30,25 +30,25 @@ static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj);
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew); static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew);
static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj);
static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq); static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq);
static int32_t mndProcessCreateBnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropBnodeRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter); static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter);
int32_t mndInitBnode(SMnode *pMnode) { int32_t mndInitBnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_BNODE, SSdbTable table = {
.keyType = SDB_KEY_INT32, .sdbType = SDB_BNODE,
.encodeFp = (SdbEncodeFp)mndBnodeActionEncode, .keyType = SDB_KEY_INT32,
.decodeFp = (SdbDecodeFp)mndBnodeActionDecode, .encodeFp = (SdbEncodeFp)mndBnodeActionEncode,
.insertFp = (SdbInsertFp)mndBnodeActionInsert, .decodeFp = (SdbDecodeFp)mndBnodeActionDecode,
.updateFp = (SdbUpdateFp)mndBnodeActionUpdate, .insertFp = (SdbInsertFp)mndBnodeActionInsert,
.deleteFp = (SdbDeleteFp)mndBnodeActionDelete}; .updateFp = (SdbUpdateFp)mndBnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndBnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_BNODE, mndProcessCreateBnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_BNODE, mndProcessCreateBnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_BNODE, mndProcessDropBnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_BNODE, mndProcessDropBnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndProcessCreateBnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndProcessDropBnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode);
...@@ -427,16 +427,6 @@ _OVER: ...@@ -427,16 +427,6 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessCreateBnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropBnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
......
...@@ -263,7 +263,8 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { ...@@ -263,7 +263,8 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
} }
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby) { static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool standby) {
STransAction action = {0}; STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
...@@ -288,6 +289,32 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p ...@@ -288,6 +289,32 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0; return 0;
} }
static int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t contLen = sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
action.pCont = pHead;
action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_CONFIRM;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pHead);
return -1;
}
return 0;
}
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) { static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) {
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
...@@ -415,7 +442,6 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -415,7 +442,6 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0; if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF; if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
} }
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
...@@ -726,30 +752,32 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -726,30 +752,32 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
if (newVgroup.replica < pDb->cfg.replications) { if (newVgroup.replica < pDb->cfg.replications) {
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId); pVgroup->vnodeGid[0].dnodeId);
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) { if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
mError("db:%s, failed to add vnode to vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr());
return -1;
}
newVgroup.replica = pDb->cfg.replications;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1; if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1; if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
} else { } else {
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId); mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
SVnodeGid del1 = {0}; SVnodeGid del1 = {0};
SVnodeGid del2 = {0}; if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1, &del2) != 0) {
mError("db:%s, failed to remove vnode from vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr());
return -1;
}
newVgroup.replica = pDb->cfg.replications;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
} }
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup); SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
...@@ -1341,7 +1369,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -1341,7 +1369,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode); int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) { if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) {
mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName); mDebug("db:%s, version and numOfTable not changed", pDbVgVersion->dbFName);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
continue; continue;
} else { } else {
...@@ -1433,12 +1461,22 @@ const char *mndGetDbStr(const char *src) { ...@@ -1433,12 +1461,22 @@ const char *mndGetDbStr(const char *src) {
int64_t getValOfDiffPrecision(int8_t unit, int64_t val) { int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
int64_t v = 0; int64_t v = 0;
switch(unit) { switch (unit) {
case 's': v = val / 1000; break; case 's':
case 'm': v = val / tsTickPerMin[TSDB_TIME_PRECISION_MILLI]; break; v = val / 1000;
case 'h': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 60); break; break;
case 'd': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60); break; case 'm':
case 'w': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60 * 7); break; v = val / tsTickPerMin[TSDB_TIME_PRECISION_MILLI];
break;
case 'h':
v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 60);
break;
case 'd':
v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60);
break;
case 'w':
v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60 * 7);
break;
default: default:
break; break;
} }
...@@ -1446,32 +1484,32 @@ int64_t getValOfDiffPrecision(int8_t unit, int64_t val) { ...@@ -1446,32 +1484,32 @@ int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
return v; return v;
} }
char* buildRetension(SArray* pRetension) { char *buildRetension(SArray *pRetension) {
size_t size = taosArrayGetSize(pRetension); size_t size = taosArrayGetSize(pRetension);
if (size == 0) { if (size == 0) {
return NULL; return NULL;
} }
char* p1 = taosMemoryCalloc(1, 100); char *p1 = taosMemoryCalloc(1, 100);
SRetention* p = taosArrayGet(pRetension, 0); SRetention *p = taosArrayGet(pRetension, 0);
int32_t len = 2; int32_t len = 2;
int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq); int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep); int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit); len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit);
p = taosArrayGet(pRetension, 1); p = taosArrayGet(pRetension, 1);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq); v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep); v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit); len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit);
p = taosArrayGet(pRetension, 2); p = taosArrayGet(pRetension, 2);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq); v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep); v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c", v1, p->freqUnit, v2, p->keepUnit); len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
varDataSetLen(p1, len); varDataSetLen(p1, len);
return p1; return p1;
...@@ -1586,7 +1624,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in ...@@ -1586,7 +1624,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
} }
colDataAppend(pColInfo, rows, cacheModel, null); colDataAppend(pColInfo, rows, cacheModel, null);
#endif #endif
colDataAppend(pColInfo, rows, (const char*) &pDb->cfg.cacheLastRow, false); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false);
char *prec = NULL; char *prec = NULL;
switch (pDb->cfg.precision) { switch (pDb->cfg.precision) {
...@@ -1618,7 +1656,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in ...@@ -1618,7 +1656,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
char* p = buildRetension(pDb->cfg.pRetensions); char *p = buildRetension(pDb->cfg.pRetensions);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols); pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
if (p == NULL) { if (p == NULL) {
......
...@@ -30,26 +30,26 @@ static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj); ...@@ -30,26 +30,26 @@ static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj);
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew); static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew);
static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj); static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj);
static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq); static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq);
static int32_t mndProcessCreateQnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropQnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessQnodeListReq(SRpcMsg *pReq); static int32_t mndProcessQnodeListReq(SRpcMsg *pReq);
static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter); static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
int32_t mndInitQnode(SMnode *pMnode) { int32_t mndInitQnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_QNODE, SSdbTable table = {
.keyType = SDB_KEY_INT32, .sdbType = SDB_QNODE,
.encodeFp = (SdbEncodeFp)mndQnodeActionEncode, .keyType = SDB_KEY_INT32,
.decodeFp = (SdbDecodeFp)mndQnodeActionDecode, .encodeFp = (SdbEncodeFp)mndQnodeActionEncode,
.insertFp = (SdbInsertFp)mndQnodeActionInsert, .decodeFp = (SdbDecodeFp)mndQnodeActionDecode,
.updateFp = (SdbUpdateFp)mndQnodeActionUpdate, .insertFp = (SdbInsertFp)mndQnodeActionInsert,
.deleteFp = (SdbDeleteFp)mndQnodeActionDelete}; .updateFp = (SdbUpdateFp)mndQnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndQnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_QNODE, mndProcessCreateQnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_QNODE, mndProcessCreateQnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_QNODE, mndProcessDropQnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_QNODE, mndProcessDropQnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndProcessCreateQnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_QNODE_LIST, mndProcessQnodeListReq); mndSetMsgHandle(pMnode, TDMT_MND_QNODE_LIST, mndProcessQnodeListReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes);
...@@ -503,16 +503,6 @@ _OVER: ...@@ -503,16 +503,6 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessCreateQnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropQnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
......
...@@ -38,25 +38,25 @@ static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb); ...@@ -38,25 +38,25 @@ static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew); static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessVCreateSmaRsp(SRpcMsg *pRsp);
static int32_t mndProcessVDropSmaRsp(SRpcMsg *pRsp);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
int32_t mndInitSma(SMnode *pMnode) { int32_t mndInitSma(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SMA, SSdbTable table = {
.keyType = SDB_KEY_BINARY, .sdbType = SDB_SMA,
.encodeFp = (SdbEncodeFp)mndSmaActionEncode, .keyType = SDB_KEY_BINARY,
.decodeFp = (SdbDecodeFp)mndSmaActionDecode, .encodeFp = (SdbEncodeFp)mndSmaActionEncode,
.insertFp = (SdbInsertFp)mndSmaActionInsert, .decodeFp = (SdbDecodeFp)mndSmaActionDecode,
.updateFp = (SdbUpdateFp)mndSmaActionUpdate, .insertFp = (SdbInsertFp)mndSmaActionInsert,
.deleteFp = (SdbDeleteFp)mndSmaActionDelete}; .updateFp = (SdbUpdateFp)mndSmaActionUpdate,
.deleteFp = (SdbDeleteFp)mndSmaActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessMCreateSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessMCreateSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
...@@ -637,11 +637,6 @@ _OVER: ...@@ -637,11 +637,6 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessVCreateSmaRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma); SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
...@@ -910,11 +905,6 @@ _OVER: ...@@ -910,11 +905,6 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessVDropSmaRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
......
...@@ -30,25 +30,25 @@ static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj); ...@@ -30,25 +30,25 @@ static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj);
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew); static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew);
static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj);
static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq); static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq);
static int32_t mndProcessCreateSnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropSnodeRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter); static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
int32_t mndInitSnode(SMnode *pMnode) { int32_t mndInitSnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SNODE, SSdbTable table = {
.keyType = SDB_KEY_INT32, .sdbType = SDB_SNODE,
.encodeFp = (SdbEncodeFp)mndSnodeActionEncode, .keyType = SDB_KEY_INT32,
.decodeFp = (SdbDecodeFp)mndSnodeActionDecode, .encodeFp = (SdbEncodeFp)mndSnodeActionEncode,
.insertFp = (SdbInsertFp)mndSnodeActionInsert, .decodeFp = (SdbDecodeFp)mndSnodeActionDecode,
.updateFp = (SdbUpdateFp)mndSnodeActionUpdate, .insertFp = (SdbInsertFp)mndSnodeActionInsert,
.deleteFp = (SdbDeleteFp)mndSnodeActionDelete}; .updateFp = (SdbUpdateFp)mndSnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndSnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SNODE, mndProcessCreateSnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SNODE, mndProcessCreateSnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SNODE, mndProcessDropSnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_SNODE, mndProcessDropSnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndProcessCreateSnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndProcessDropSnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode);
...@@ -437,16 +437,6 @@ _OVER: ...@@ -437,16 +437,6 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessCreateSnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropSnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
......
...@@ -38,9 +38,6 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); ...@@ -38,9 +38,6 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq); static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq);
static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessMDropStbReq(SRpcMsg *pReq); static int32_t mndProcessMDropStbReq(SRpcMsg *pReq);
static int32_t mndProcessVCreateStbRsp(SRpcMsg *pRsp);
static int32_t mndProcessVAlterStbRsp(SRpcMsg *pRsp);
static int32_t mndProcessVDropStbRsp(SRpcMsg *pRsp);
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
...@@ -59,9 +56,9 @@ int32_t mndInitStb(SMnode *pMnode) { ...@@ -59,9 +56,9 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
...@@ -837,11 +834,6 @@ _OVER: ...@@ -837,11 +834,6 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessVCreateStbRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) { static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0; if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0;
...@@ -1459,11 +1451,6 @@ _OVER: ...@@ -1459,11 +1451,6 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessVAlterStbRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
...@@ -1599,11 +1586,6 @@ _OVER: ...@@ -1599,11 +1586,6 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessVDropStbRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) { static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
......
...@@ -35,7 +35,6 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); ...@@ -35,7 +35,6 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessTaskDeployInternalRsp(SRpcMsg *pRsp);
/*static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);*/ /*static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);*/
/*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/ /*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
...@@ -44,17 +43,19 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -44,17 +43,19 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
int32_t mndInitStream(SMnode *pMnode) { int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_STREAM, SSdbTable table = {
.keyType = SDB_KEY_BINARY, .sdbType = SDB_STREAM,
.encodeFp = (SdbEncodeFp)mndStreamActionEncode, .keyType = SDB_KEY_BINARY,
.decodeFp = (SdbDecodeFp)mndStreamActionDecode, .encodeFp = (SdbEncodeFp)mndStreamActionEncode,
.insertFp = (SdbInsertFp)mndStreamActionInsert, .decodeFp = (SdbDecodeFp)mndStreamActionDecode,
.updateFp = (SdbUpdateFp)mndStreamActionUpdate, .insertFp = (SdbInsertFp)mndStreamActionInsert,
.deleteFp = (SdbDeleteFp)mndStreamActionDelete}; .updateFp = (SdbUpdateFp)mndStreamActionUpdate,
.deleteFp = (SdbDeleteFp)mndStreamActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp); mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
...@@ -195,11 +196,6 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { ...@@ -195,11 +196,6 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }
static int32_t mndProcessTaskDeployInternalRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) { static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) {
SName name = {0}; SName name = {0};
tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
......
...@@ -43,7 +43,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs ...@@ -43,7 +43,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pMsg);
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
...@@ -65,20 +64,22 @@ static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeO ...@@ -65,20 +64,22 @@ static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeO
} }
int32_t mndInitSubscribe(SMnode *pMnode) { int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE, SSdbTable table = {
.keyType = SDB_KEY_BINARY, .sdbType = SDB_SUBSCRIBE,
.encodeFp = (SdbEncodeFp)mndSubActionEncode, .keyType = SDB_KEY_BINARY,
.decodeFp = (SdbDecodeFp)mndSubActionDecode, .encodeFp = (SdbEncodeFp)mndSubActionEncode,
.insertFp = (SdbInsertFp)mndSubActionInsert, .decodeFp = (SdbDecodeFp)mndSubActionDecode,
.updateFp = (SdbUpdateFp)mndSubActionUpdate, .insertFp = (SdbInsertFp)mndSubActionInsert,
.deleteFp = (SdbDeleteFp)mndSubActionDelete}; .updateFp = (SdbUpdateFp)mndSubActionUpdate,
.deleteFp = (SdbDeleteFp)mndSubActionDelete,
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); };
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
...@@ -789,11 +790,6 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { ...@@ -789,11 +790,6 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
} }
static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
......
...@@ -37,7 +37,6 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); ...@@ -37,7 +37,6 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq); static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq); static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
...@@ -45,17 +44,19 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); ...@@ -45,17 +44,19 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
int32_t mndInitTopic(SMnode *pMnode) { int32_t mndInitTopic(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TOPIC, SSdbTable table = {
.keyType = SDB_KEY_BINARY, .sdbType = SDB_TOPIC,
.encodeFp = (SdbEncodeFp)mndTopicActionEncode, .keyType = SDB_KEY_BINARY,
.decodeFp = (SdbDecodeFp)mndTopicActionDecode, .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
.insertFp = (SdbInsertFp)mndTopicActionInsert, .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
.updateFp = (SdbUpdateFp)mndTopicActionUpdate, .insertFp = (SdbInsertFp)mndTopicActionInsert,
.deleteFp = (SdbDeleteFp)mndTopicActionDelete}; .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
.deleteFp = (SdbDeleteFp)mndTopicActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
...@@ -607,11 +608,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -607,11 +608,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName); SDbObj *pDb = mndAcquireDb(pMnode, dbName);
......
...@@ -29,11 +29,6 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); ...@@ -29,11 +29,6 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
static int32_t mndProcessCreateVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessAlterVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
...@@ -50,11 +45,12 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -50,11 +45,12 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete, .deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
}; };
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndProcessAlterVnodeRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndProcessAlterVnodeRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndProcessDropVnodeRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndProcessCompactVnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
...@@ -512,12 +508,12 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { ...@@ -512,12 +508,12 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
} }
int32_t maxPos = 1; SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) { for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
SDnodeObj *pDnode = taosArrayGet(pArray, d); SDnodeObj *pDnode = taosArrayGet(pArray, d);
bool used = false; bool used = false;
for (int32_t vn = 0; vn < maxPos; ++vn) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) { if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
used = true; used = true;
break; break;
...@@ -530,59 +526,58 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { ...@@ -530,59 +526,58 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
return -1; return -1;
} }
SVnodeGid *pVgid = &pVgroup->vnodeGid[maxPos];
pVgid->dnodeId = pDnode->id; pVgid->dnodeId = pDnode->id;
pVgid->role = TAOS_SYNC_STATE_ERROR; pVgid->role = TAOS_SYNC_STATE_ERROR;
pDnode->numOfVnodes++; mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is added", pVgroup->dbName, pVgroup->vgId, pVgroup->replica,
pVgid->dnodeId);
mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId); pVgroup->replica++;
maxPos++; pDnode->numOfVnodes++;
if (maxPos == 3) return 0; return 0;
} }
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
return -1; return -1;
} }
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2) { int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid) {
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SDnodeObj *pDnode = taosArrayGet(pArray, i); SDnodeObj *pDnode = taosArrayGet(pArray, i);
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes); mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
} }
int32_t removedNum = 0; int32_t code = -1;
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) { for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
SDnodeObj *pDnode = taosArrayGet(pArray, d); SDnodeObj *pDnode = taosArrayGet(pArray, d);
for (int32_t vn = 0; vn < TSDB_MAX_REPLICA; ++vn) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
if (pVgid->dnodeId == pDnode->id) { if (pVgid->dnodeId == pDnode->id) {
if (removedNum == 0) *del1 = *pVgid; mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
if (removedNum == 1) *del2 = *pVgid;
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
memset(pVgid, 0, sizeof(SVnodeGid));
removedNum++;
pDnode->numOfVnodes--; pDnode->numOfVnodes--;
pVgroup->replica--;
if (removedNum == 2) goto _OVER; *pDelVgid = *pVgid;
*pVgid = pVgroup->vnodeGid[pVgroup->replica];
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
code = 0;
goto _OVER;
} }
} }
} }
_OVER: _OVER:
if (removedNum != 2) return -1; if (code != 0) {
terrno = TSDB_CODE_APP_ERROR;
mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
return -1;
}
for (int32_t vn = 1; vn < TSDB_MAX_REPLICA; ++vn) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
if (pVgid->dnodeId != 0) { mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
memcpy(&pVgroup->vnodeGid[0], pVgid, sizeof(SVnodeGid));
memset(pVgid, 0, sizeof(SVnodeGid));
}
} }
mInfo("db:%s, vgId:%d, dnode:%d is keeped", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId);
return 0; return 0;
} }
...@@ -605,23 +600,6 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) { ...@@ -605,23 +600,6 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
return epset; return epset;
} }
static int32_t mndProcessCreateVnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessAlterVnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropVnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessCompactVnodeRsp(SRpcMsg *pRsp) { return 0; }
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj; SVgObj *pVgroup = pObj;
int64_t uid = *(int64_t *)p1; int64_t uid = *(int64_t *)p1;
......
...@@ -15,14 +15,15 @@ ...@@ -15,14 +15,15 @@
#include "vnd.h" #include "vnd.h"
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
...@@ -99,11 +100,11 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -99,11 +100,11 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
return code; return code;
} }
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) { int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL; void *ptr = NULL;
void *pReq; void *pReq;
int len; int32_t len;
int ret; int32_t ret;
vTrace("vgId:%d, start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vTrace("vgId:%d, start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version); version);
...@@ -158,6 +159,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -158,6 +159,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
} }
} break; } break;
case TDMT_VND_ALTER_CONFIRM:
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
break;
case TDMT_VND_ALTER_CONFIG: case TDMT_VND_ALTER_CONFIG:
break; break;
default: default:
...@@ -190,7 +194,7 @@ _err: ...@@ -190,7 +194,7 @@ _err:
return -1; return -1;
} }
int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg) { int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) { if (TDMT_VND_QUERY != pMsg->msgType) {
return 0; return 0;
} }
...@@ -212,7 +216,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -212,7 +216,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
} }
} }
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
...@@ -267,7 +271,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { ...@@ -267,7 +271,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision; pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
} }
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) { if (syncEnvIsStart()) {
...@@ -357,7 +361,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -357,7 +361,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return ret; return ret;
} }
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder;
...@@ -389,9 +393,9 @@ _err: ...@@ -389,9 +393,9 @@ _err:
return -1; return -1;
} }
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
int rcode = 0; int32_t rcode = 0;
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq; SVCreateTbReq *pCreateReq;
SVCreateTbBatchRsp rsp = {0}; SVCreateTbBatchRsp rsp = {0};
...@@ -422,7 +426,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -422,7 +426,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
} }
// loop to create table // loop to create table
for (int iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
// validate hash // validate hash
...@@ -477,7 +481,7 @@ _exit: ...@@ -477,7 +481,7 @@ _exit:
return rcode; return rcode;
} }
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder dc = {0}; SDecoder dc = {0};
...@@ -506,9 +510,9 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -506,9 +510,9 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq,
return 0; return 0;
} }
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVDropStbReq req = {0}; SVDropStbReq req = {0};
int rcode = TSDB_CODE_SUCCESS; int32_t rcode = TSDB_CODE_SUCCESS;
SDecoder decoder = {0}; SDecoder decoder = {0};
pRsp->msgType = TDMT_VND_CREATE_STB_RSP; pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
...@@ -535,12 +539,12 @@ _exit: ...@@ -535,12 +539,12 @@ _exit:
return 0; return 0;
} }
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVAlterTbReq vAlterTbReq = {0}; SVAlterTbReq vAlterTbReq = {0};
SVAlterTbRsp vAlterTbRsp = {0}; SVAlterTbRsp vAlterTbRsp = {0};
SDecoder dc = {0}; SDecoder dc = {0};
int rcode = 0; int32_t rcode = 0;
int ret; int32_t ret;
SEncoder ec = {0}; SEncoder ec = {0};
STableMetaRsp vMetaRsp = {0}; STableMetaRsp vMetaRsp = {0};
...@@ -582,12 +586,12 @@ _exit: ...@@ -582,12 +586,12 @@ _exit:
return 0; return 0;
} }
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVDropTbBatchReq req = {0}; SVDropTbBatchReq req = {0};
SVDropTbBatchRsp rsp = {0}; SVDropTbBatchRsp rsp = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
SEncoder encoder = {0}; SEncoder encoder = {0};
int ret; int32_t ret;
SArray *tbUids = NULL; SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
...@@ -609,7 +613,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -609,7 +613,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp)); rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit; if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
for (int iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq; SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0}; SVDropTbRsp dropTbRsp = {0};
...@@ -641,7 +645,8 @@ _exit: ...@@ -641,7 +645,8 @@ _exit:
return 0; return 0;
} }
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
const char *tags) {
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STSchema *pSchema = NULL; STSchema *pSchema = NULL;
tb_uid_t suid = 0; tb_uid_t suid = 0;
...@@ -675,7 +680,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub ...@@ -675,7 +680,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SMeta *pMeta = pVnode->pMeta; SMeta *pMeta = pVnode->pMeta;
...@@ -692,7 +697,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char ...@@ -692,7 +697,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
return 0; return 0;
} }
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0}; SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
...@@ -808,7 +813,7 @@ _exit: ...@@ -808,7 +813,7 @@ _exit:
return 0; return 0;
} }
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateTSmaReq req = {0}; SVCreateTSmaReq req = {0};
SDecoder coder; SDecoder coder;
...@@ -859,3 +864,13 @@ _err: ...@@ -859,3 +864,13 @@ _err:
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) { int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL); return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
} }
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册