提交 293ad0bb 编写于 作者: S Shengliang Guan

TD-10431 process stable create msg in mnode

上级 7ef0ded3
......@@ -94,9 +94,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message from vnode to dnode
// message from mnode to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-in" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-in" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-in" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-internal" )
// message from mnode to mnode
// message from mnode to qnode
// message from mnode to dnode
......
......@@ -132,6 +132,36 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
*/
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a query message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a fetch message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a consume message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessConsumeMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/* ------------------------ SVnodeCfg ------------------------ */
/**
* @brief Initialize VNODE options.
......@@ -186,31 +216,6 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type);
#if 1
#include "taosmsg.h"
#include "trpc.h"
// typedef struct {
// char db[TSDB_FULL_DB_NAME_LEN];
// int32_t cacheBlockSize; // MB
// int32_t totalBlocks;
// int32_t daysPerFile;
// int32_t daysToKeep0;
// int32_t daysToKeep1;
// int32_t daysToKeep2;
// int32_t minRows;
// int32_t maxRows;
// int8_t precision; // time resolution
// int8_t compression;
// int8_t cacheLastRow;
// int8_t update;
// int8_t quorum;
// int8_t replica;
// int8_t selfIndex;
// int8_t walLevel;
// int32_t fsyncPeriod; // millisecond
// SReplica replicas[TSDB_MAX_REPLICA];
// } SVnodeCfg;
typedef enum {
VN_MSG_TYPE_WRITE = 1,
VN_MSG_TYPE_APPLY,
......
......@@ -270,6 +270,8 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
taosMsleep(100000);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
......
......@@ -46,3 +46,4 @@ void stopServer(SServer* pServer);
SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port);
void dropClient(SClient* pClient);
void sendMsg(SClient* pClient, SRpcMsg* pMsg);
......@@ -26,8 +26,9 @@ int32_t mndInitVgroup(SMnode *pMnode);
void mndCleanupVgroup(SMnode *pMnode);
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
......
......@@ -21,6 +21,7 @@
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tname.h"
#define TSDB_STB_VER_NUMBER 1
......@@ -199,7 +200,54 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
return mndAcquireDb(pMnode, db);
}
static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) {
static SCreateStbInternalMsg *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t totalCols = pStb->numOfTags + pStb->numOfColumns;
int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg);
SCreateStbInternalMsg *pCreate = calloc(1, contLen);
if (pCreate == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pCreate->head.contLen = htonl(contLen);
pCreate->head.vgId = htonl(pVgroup->vgId);
memcpy(pCreate->name, pStb->name, TSDB_TABLE_FNAME_LEN);
pCreate->suid = htobe64(pStb->uid);
pCreate->sverson = htonl(pStb->version);
pCreate->ttl = 0;
pCreate->keep = 0;
pCreate->numOfTags = htonl(pStb->numOfTags);
pCreate->numOfColumns = htonl(pStb->numOfColumns);
memcpy(pCreate->pSchema, pStb->pSchema, totalCols * sizeof(SSchema));
for (int32_t t = 0; t < totalCols; ++t) {
SSchema *pSchema = &pCreate->pSchema[t];
pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
}
return pCreate;
}
static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SDropStbInternalMsg);
SDropStbInternalMsg *pDrop = calloc(1, contLen);
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pDrop->head.contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
pDrop->suid = htobe64(pStb->uid);
return pDrop;
}
static int32_t mndCheckCreateStbMsg(SCreateStbMsg *pCreate) {
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
pCreate->numOfTags = htonl(pCreate->numOfTags);
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
......@@ -248,7 +296,7 @@ static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) {
return 0;
}
static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
......@@ -257,7 +305,7 @@ static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *
return 0;
}
static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
......@@ -266,7 +314,7 @@ static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *
return 0;
}
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
......@@ -275,60 +323,72 @@ static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj
return 0;
}
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
// for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
// SVgObj *pVgroup = pVgroups + vg;
// for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
// STransAction action = {0};
// SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
// SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
// if (pDnode == NULL) return -1;
// action.epSet = mndGetDnodeEpset(pDnode);
// mndReleaseDnode(pMnode, pDnode);
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
// SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
// if (pMsg == NULL) return -1;
SCreateStbInternalMsg *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb);
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
// action.pCont = pMsg;
// action.contLen = sizeof(SCreateVnodeMsg);
// action.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
// if (mndTransAppendRedoAction(pTrans, &action) != 0) {
// free(pMsg);
// return -1;
// }
// }
// }
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pMsg;
action.contLen = sizeof(SCreateStbInternalMsg);
action.msgType = TSDB_MSG_TYPE_CREATE_STB_IN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
// for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
// SVgObj *pVgroup = pVgroups + vg;
// for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
// STransAction action = {0};
// SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
// SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
// if (pDnode == NULL) return -1;
// action.epSet = mndGetDnodeEpset(pDnode);
// mndReleaseDnode(pMnode, pDnode);
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
// SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
// if (pMsg == NULL) return -1;
SDropStbInternalMsg *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
// action.pCont = pMsg;
// action.contLen = sizeof(SDropVnodeMsg);
// action.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
// if (mndTransAppendUndoAction(pTrans, &action) != 0) {
// free(pMsg);
// return -1;
// }
// }
// }
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pMsg;
action.contLen = sizeof(SDropStbInternalMsg);
action.msgType = TSDB_MSG_TYPE_DROP_STB_IN;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
......@@ -362,27 +422,27 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
}
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
if (mndSetCreateStbRedoLogs(pMnode, pTrans, &stbObj) != 0) {
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto CREATE_STB_OVER;
}
if (mndSetCreateStbUndoLogs(pMnode, pTrans, &stbObj) != 0) {
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
goto CREATE_STB_OVER;
}
if (mndSetCreateStbCommitLogs(pMnode, pTrans, &stbObj) != 0) {
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto CREATE_STB_OVER;
}
if (mndSetCreateStbRedoActions(pMnode, pTrans, &stbObj) != 0) {
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_STB_OVER;
}
if (mndSetCreateStbUndoActions(pMnode, pTrans, &stbObj) != 0) {
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_STB_OVER;
}
......@@ -406,7 +466,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
mDebug("stb:%s, start to create", pCreate->name);
if (mndCheckStbMsg(pCreate) != 0) {
if (mndCheckCreateStbMsg(pCreate) != 0) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
......@@ -443,7 +503,10 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
SSchema *pSchema = &pAlter->schema;
......@@ -504,7 +567,10 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
......@@ -613,7 +679,10 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
}
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
......
......@@ -465,11 +465,15 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
}
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
return mndTransAppendAction(pTrans->redoActions, pAction);
int32_t code = mndTransAppendAction(pTrans->redoActions, pAction);
mTrace("trans:%d, msg:%s append to redo actions, code:0x%x", pTrans->id, taosMsg[pAction->msgType], code);
return code;
}
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
return mndTransAppendAction(pTrans->undoActions, pAction);
int32_t code = mndTransAppendAction(pTrans->undoActions, pAction);
mTrace("trans:%d, msg:%s append to undo actions, code:0x%x", pTrans->id, taosMsg[pAction->msgType], code);
return code;
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
......@@ -504,7 +508,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
mDebug("trans:%d, prepare finished", pNewTrans->id);
pNewTrans->rpcHandle = pTrans->rpcHandle;
mndTransExecute(pMnode, pNewTrans);
mndReleaseTrans(pMnode, pNewTrans);
......
......@@ -311,6 +311,27 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
return 0;
}
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
SEpSet epset = {0};
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) continue;
if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
epset.inUse = epset.numOfEps;
}
epset.port[epset.numOfEps] = pDnode->port;
memcpy(&epset.fqdn[epset.numOfEps], pDnode->fqdn, TSDB_FQDN_LEN);
epset.numOfEps++;
mndReleaseDnode(pMnode, pDnode);
}
return epset;
}
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
return 0;
......
......@@ -240,6 +240,8 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
}
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
*ppObj = NULL;
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册