提交 67cc6839 编写于 作者: H Hongze Cheng

more

上级 a60e9e40
...@@ -177,7 +177,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb ...@@ -177,7 +177,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb
} }
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) { SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName); SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName);
if (pStb == NULL) { if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST; terrno = TSDB_CODE_MND_STB_NOT_EXIST;
...@@ -200,7 +200,36 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) { ...@@ -200,7 +200,36 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
return mndAcquireDb(pMnode, db); return mndAcquireDb(pMnode, db);
} }
static SCreateStbInternalMsg *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *contLen) {
SVCreateTbReq req;
void * buf;
int bsize;
req.ver = 0;
req.name = pStb->name;
req.ttl = 0;
req.keep = 0;
req.type = TD_SUPER_TABLE;
req.stbCfg.suid = pStb->uid;
req.stbCfg.nCols = pStb->numOfColumns;
req.stbCfg.pSchema = pStb->pSchema;
req.stbCfg.nTagCols = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns;
bsize = tSerializeSVCreateTbReq(NULL, &req);
buf = malloc(bsize);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void *pBuf = buf;
tSerializeSVCreateTbReq(&pBuf, &req);
*contLen = bsize;
return buf;
#if 0
int32_t totalCols = pStb->numOfTags + pStb->numOfColumns; int32_t totalCols = pStb->numOfTags + pStb->numOfColumns;
int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg); int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg);
...@@ -226,8 +255,8 @@ static SCreateStbInternalMsg *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgro ...@@ -226,8 +255,8 @@ static SCreateStbInternalMsg *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgro
pSchema->bytes = htonl(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId); pSchema->colId = htonl(pSchema->colId);
} }
return pCreate; return pCreate;
#endif
} }
static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
...@@ -324,16 +353,17 @@ static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -324,16 +353,17 @@ static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj
} }
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
void *pIter = NULL; void * pIter = NULL;
int contLen;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) continue;
SCreateStbInternalMsg *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb); void *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb, &contLen);
if (pMsg == NULL) { if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -344,7 +374,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -344,7 +374,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = htonl(pMsg->head.contLen); action.contLen = htonl(contLen);
action.msgType = TDMT_VND_CREATE_STB; action.msgType = TDMT_VND_CREATE_STB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -359,9 +389,9 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -359,9 +389,9 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
} }
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
void *pIter = NULL; void * pIter = NULL;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
...@@ -461,7 +491,7 @@ CREATE_STB_OVER: ...@@ -461,7 +491,7 @@ CREATE_STB_OVER:
} }
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode * pMnode = pMsg->pMnode;
SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont; SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to create", pCreate->name); mDebug("stb:%s, start to create", pCreate->name);
...@@ -484,7 +514,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { ...@@ -484,7 +514,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
} }
} }
//topic should have different name with stb // topic should have different name with stb
SStbObj *pTopic = mndAcquireStb(pMnode, pCreate->name); SStbObj *pTopic = mndAcquireStb(pMnode, pCreate->name);
if (pTopic != NULL) { if (pTopic != NULL) {
sdbRelease(pMnode->pSdb, pTopic); sdbRelease(pMnode->pSdb, pTopic);
...@@ -545,7 +575,7 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) { ...@@ -545,7 +575,7 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; } static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; }
static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode * pMnode = pMsg->pMnode;
SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont; SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to alter", pAlter->name); mDebug("stb:%s, start to alter", pAlter->name);
...@@ -659,7 +689,7 @@ DROP_STB_OVER: ...@@ -659,7 +689,7 @@ DROP_STB_OVER:
} }
static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode * pMnode = pMsg->pMnode;
SDropStbMsg *pDrop = pMsg->rpcMsg.pCont; SDropStbMsg *pDrop = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to drop", pDrop->name); mDebug("stb:%s, start to drop", pDrop->name);
...@@ -694,7 +724,7 @@ static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { ...@@ -694,7 +724,7 @@ static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
} }
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode * pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname); mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
...@@ -766,7 +796,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs ...@@ -766,7 +796,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
} }
int32_t numOfStbs = 0; int32_t numOfStbs = 0;
void *pIter = NULL; void * pIter = NULL;
while (1) { while (1) {
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb); pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
...@@ -785,7 +815,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs ...@@ -785,7 +815,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) { if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1; return -1;
...@@ -847,12 +877,12 @@ static void mndExtractTableName(char *tableId, char *name) { ...@@ -847,12 +877,12 @@ static void mndExtractTableName(char *tableId, char *name) {
} }
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode * pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
int32_t cols = 0; int32_t cols = 0;
char *pWrite; char * pWrite;
char prefix[64] = {0}; char prefix[64] = {0};
tstrncpy(prefix, pShow->db, 64); tstrncpy(prefix, pShow->db, 64);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册