提交 8bba00f2 编写于 作者: S Shengliang Guan

TD-10431 build create vnode msg

上级 aace32e6
......@@ -750,31 +750,36 @@ typedef struct {
} SReplica;
typedef struct {
char db[TSDB_FULL_DB_NAME_LEN];
int32_t vgId;
int32_t dnodeId;
char db[TSDB_FULL_DB_NAME_LEN];
uint64_t dbUid;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t minRows;
int32_t maxRows;
int32_t commitTime;
int32_t fsyncPeriod;
int8_t reserved[16];
int8_t walLevel;
int8_t precision;
int8_t compression;
int8_t cacheLastRow;
int8_t update;
int8_t walLevel;
int8_t quorum;
int8_t update;
int8_t cacheLastRow;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg;
typedef struct {
int32_t vgId;
int32_t vgId;
int32_t dnodeId;
char db[TSDB_FULL_DB_NAME_LEN];
uint64_t dbUid;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct {
......
......@@ -120,17 +120,18 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input")
// mnode-common
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0305)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0306)
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0307)
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0308)
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0309)
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030A)
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0305)
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0306)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0307)
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0308)
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0309)
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x030A)
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030B)
// mnode-show
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310)
......
......@@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg);
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code);
char *mndTransStageStr(ETrnStage stage);
......
......@@ -28,7 +28,9 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
#ifdef __cplusplus
}
......
......@@ -285,10 +285,62 @@ static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
}
static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SVgObj *pVgroup = pVgroups + v;
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) {
return -1;
}
SEpSet epset = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) {
return -1;
}
SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SCreateVnodeMsg)};
if (mndTransAppendRedoAction(pTrans, &epset, &rpcMsg) != 0) {
rpcFreeCont(pMsg);
return -1;
}
}
}
return 0;
}
static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SVgObj *pVgroup = pVgroups + v;
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) {
return -1;
}
SEpSet epset = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) {
return -1;
}
SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SDropVnodeMsg)};
if (mndTransAppendUndoAction(pTrans, &epset, &rpcMsg) != 0) {
rpcFreeCont(pMsg);
return -1;
}
}
}
return 0;
}
......@@ -644,7 +696,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
return -1;
}
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
SUseDbRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -180,8 +180,12 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj
}
SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
SSdb *pSdb = pMnode->pSdb;
SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId);
if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
}
return pDnode;
}
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) {
......
......@@ -21,6 +21,11 @@
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
typedef struct {
SEpSet epSet;
SRpcMsg msg;
} STransAction;
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
......@@ -29,8 +34,10 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code);
static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw);
static void mndTransDropArray(SArray *pArray);
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg);
static void mndTransDropLog(SArray *pArray);
static void mndTransDropAction(SArray *pArray);
static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
......@@ -58,7 +65,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
void mndCleanupTrans(SMnode *pMnode) {}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t rawDataLen = 16 * sizeof(int32_t) + TSDB_TRN_RESERVE_SIZE;
int32_t rawDataLen = sizeof(STrans) + TSDB_TRN_RESERVE_SIZE;
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
......@@ -80,6 +87,16 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
rawDataLen += sdbGetRawTotalSize(pTmp);
}
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
rawDataLen += (sizeof(STransAction) + pAction->msg.contLen);
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
rawDataLen += (sizeof(STransAction) + pAction->msg.contLen);
}
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen);
if (pRaw == NULL) {
mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
......@@ -116,6 +133,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
}
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen);
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen);
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
......@@ -147,8 +180,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction));
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
......@@ -175,42 +208,77 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < redoLogNum; ++i) {
int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->redoLogs, &pData);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
break;
}
}
for (int32_t i = 0; i < undoLogNum; ++i) {
int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->undoLogs, &pData);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
break;
}
}
for (int32_t i = 0; i < commitLogNum; ++i) {
int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->commitLogs, &pData);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
break;
}
}
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction action = {0};
SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen)
action.msg.pCont = rpcMallocCont(action.msg.contLen);
if (action.msg.pCont == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen);
void *ret = taosArrayPush(pTrans->redoActions, &action);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction action = {0};
SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen)
action.msg.pCont = rpcMallocCont(action.msg.contLen);
if (action.msg.pCont == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen);
void *ret = taosArrayPush(pTrans->undoActions, &action);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
}
......@@ -237,11 +305,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
mndTransDropArray(pTrans->redoLogs);
mndTransDropArray(pTrans->undoLogs);
mndTransDropArray(pTrans->commitLogs);
mndTransDropArray(pTrans->redoActions);
mndTransDropArray(pTrans->undoActions);
mndTransDropLog(pTrans->redoLogs);
mndTransDropLog(pTrans->undoLogs);
mndTransDropLog(pTrans->commitLogs);
mndTransDropAction(pTrans->redoActions);
mndTransDropAction(pTrans->undoActions);
return 0;
}
......@@ -274,6 +342,8 @@ char *mndTransStageStr(ETrnStage stage) {
return "rollback";
case TRN_STAGE_RETRY:
return "retry";
case TRN_STAGE_OVER:
return "stop";
default:
return "undefined";
}
......@@ -305,8 +375,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction));
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
......@@ -319,7 +389,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return pTrans;
}
static void mndTransDropArray(SArray *pArray) {
static void mndTransDropLog(SArray *pArray) {
for (int32_t i = 0; i < pArray->size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
tfree(pRaw);
......@@ -328,12 +398,21 @@ static void mndTransDropArray(SArray *pArray) {
taosArrayDestroy(pArray);
}
static void mndTransDropAction(SArray *pArray) {
for (int32_t i = 0; i < pArray->size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
rpcFreeCont(pAction->msg.pCont);
}
taosArrayDestroy(pArray);
}
void mndTransDrop(STrans *pTrans) {
mndTransDropArray(pTrans->redoLogs);
mndTransDropArray(pTrans->undoLogs);
mndTransDropArray(pTrans->commitLogs);
mndTransDropArray(pTrans->redoActions);
mndTransDropArray(pTrans->undoActions);
mndTransDropLog(pTrans->redoLogs);
mndTransDropLog(pTrans->undoLogs);
mndTransDropLog(pTrans->commitLogs);
mndTransDropAction(pTrans->redoActions);
mndTransDropAction(pTrans->undoActions);
mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans);
tfree(pTrans);
......@@ -344,7 +423,7 @@ static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle);
}
static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
if (pArray == NULL || pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -360,31 +439,43 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
}
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw);
int32_t code = mndTransAppendLog(pTrans->redoLogs, pRaw);
mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw);
int32_t code = mndTransAppendLog(pTrans->undoLogs, pRaw);
mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw);
int32_t code = mndTransAppendLog(pTrans->commitLogs, pRaw);
mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
int32_t code = mndTransAppendArray(pTrans->redoActions, pMsg);
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg) {
STransAction action = {.epSet = *pEpSet, .msg = *pMsg};
void *ptr = taosArrayPush(pArray, &action);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, pMsg);
mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg);
return code;
}
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
int32_t code = mndTransAppendArray(pTrans->undoActions, pMsg);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, pMsg);
mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg);
return code;
}
......@@ -559,18 +650,37 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
return code;
}
static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb;
int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
int32_t code = sdbWriteNotFree(pSdb, pRaw);
if (code != 0) {
return code;
}
}
return 0;
}
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (taosArrayGetSize(pTrans->redoActions) != 0) {
mTrace("trans:%d, execute redo actions finished", pTrans->id);
}
return 0;
return code;
}
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (taosArrayGetSize(pTrans->undoActions) != 0) {
mTrace("trans:%d, execute undo actions finished", pTrans->id);
}
return 0;
return code;
}
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
......
......@@ -24,9 +24,10 @@
#define TSDB_VGROUP_VER_NUM 1
#define TSDB_VGROUP_RESERVE_SIZE 64
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup);
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup);
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg);
......@@ -156,6 +157,80 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup);
}
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SCreateVnodeMsg *pCreate = rpcMallocCont(sizeof(SCreateVnodeMsg));
if (pCreate == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pCreate->dnodeId = htonl(pDnode->id);
pCreate->vgId = htonl(pVgroup->vgId);
memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN);
pCreate->dbUid = htobe64(pDb->uid);
pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks);
pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile);
pCreate->daysToKeep0 = htonl(pDb->cfg.daysToKeep0);
pCreate->daysToKeep1 = htonl(pDb->cfg.daysToKeep1);
pCreate->daysToKeep2 = htonl(pDb->cfg.daysToKeep2);
pCreate->minRows = htonl(pDb->cfg.minRows);
pCreate->maxRows = htonl(pDb->cfg.maxRows);
pCreate->commitTime = htonl(pDb->cfg.commitTime);
pCreate->fsyncPeriod = htonl(pDb->cfg.fsyncPeriod);
pCreate->walLevel = pDb->cfg.walLevel;
pCreate->precision = pDb->cfg.precision;
pCreate->compression = pDb->cfg.compression;
pCreate->quorum = pDb->cfg.quorum;
pCreate->update = pDb->cfg.update;
pCreate->cacheLastRow = pDb->cfg.cacheLastRow;
pCreate->replica = pVgroup->replica;
pCreate->selfIndex = -1;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &pCreate->replicas[v];
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
rpcFreeCont(pCreate);
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
pReplica->id = htonl(pVgidDnode->id);
pReplica->port = htons(pVgidDnode->port);
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
mndReleaseDnode(pMnode, pVgidDnode);
if (pDnode->id == pVgid->dnodeId) {
pCreate->selfIndex = v;
}
}
if (pCreate->selfIndex == -1) {
rpcFreeCont(pCreate);
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
return pCreate;
}
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SDropVnodeMsg));
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pDrop->dnodeId = htonl(pDnode->id);
pDrop->vgId = htonl(pVgroup->vgId);
memcpy(pDrop->db, pDb->name, TSDB_FULL_DB_NAME_LEN);
pDrop->dbUid = htobe64(pDb->uid);
return pDrop;
}
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) {
SSdb *pSdb = pMnode->pSdb;
int32_t allocedVnodes = 0;
......
......@@ -130,6 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
// mnode-common
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Cluster not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册