提交 76a7b98b 编写于 作者: S Shengliang Guan

minor changes

上级 546b0f10
...@@ -290,37 +290,6 @@ typedef struct SSchema { ...@@ -290,37 +290,6 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
} SSchema; } SSchema;
typedef struct {
int32_t contLen;
int32_t vgId;
int8_t tableType;
int16_t numOfColumns;
int16_t numOfTags;
int32_t tid;
int32_t sversion;
int32_t tversion;
int32_t tagDataLen;
int32_t sqlDataLen;
uint64_t uid;
uint64_t superTableUid;
uint64_t createdTime;
char tableFname[TSDB_TABLE_FNAME_LEN];
char stbFname[TSDB_TABLE_FNAME_LEN];
char data[];
} SMDCreateTableMsg;
typedef struct {
int32_t len; // one create table message
char tableName[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
int8_t getMeta;
int16_t numOfTags;
int16_t numOfColumns;
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
int8_t reserved[16];
char schema[];
} SCreateTableMsg;
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
...@@ -341,16 +310,50 @@ typedef struct { ...@@ -341,16 +310,50 @@ typedef struct {
} SAlterStbMsg; } SAlterStbMsg;
typedef struct { typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; SMsgHead head;
char db[TSDB_FULL_DB_NAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int16_t type; /* operation type */ uint64_t suid;
int16_t numOfCols; /* number of schema */ int32_t sverson;
int32_t tagValLen; uint32_t ttl;
SSchema schema[]; uint32_t keep;
// tagVal is padded after schema int32_t numOfTags;
// char tagVal[]; int32_t numOfColumns;
SSchema pSchema[];
} SCreateStbInternalMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t suid;
} SDropStbInternalMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
char stbFname[TSDB_TABLE_FNAME_LEN];
int8_t tableType;
uint64_t suid;
int32_t sversion;
int32_t numOfTags;
int32_t numOfColumns;
int32_t tagDataLen;
char data[];
} SCreateTableMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t type; /* operation type */
int32_t numOfCols; /* number of schema */
int32_t numOfTags;
char data[];
} SAlterTableMsg; } SAlterTableMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
} SDropTableMsg;
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t uid; int64_t uid;
......
...@@ -248,6 +248,91 @@ static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) { ...@@ -248,6 +248,91 @@ static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) {
return 0; return 0;
} }
static int32_t mndSetCreateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
return 0;
}
static int32_t mndSetCreateStbUndoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
// for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
// SVgObj *pVgroup = pVgroups + vg;
// for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
// STransAction action = {0};
// SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
// SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
// if (pDnode == NULL) return -1;
// action.epSet = mndGetDnodeEpset(pDnode);
// mndReleaseDnode(pMnode, pDnode);
// SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
// if (pMsg == NULL) return -1;
// action.pCont = pMsg;
// action.contLen = sizeof(SCreateVnodeMsg);
// action.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
// if (mndTransAppendRedoAction(pTrans, &action) != 0) {
// free(pMsg);
// return -1;
// }
// }
// }
return 0;
}
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
// for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
// SVgObj *pVgroup = pVgroups + vg;
// for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
// STransAction action = {0};
// SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
// SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
// if (pDnode == NULL) return -1;
// action.epSet = mndGetDnodeEpset(pDnode);
// mndReleaseDnode(pMnode, pDnode);
// SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
// if (pMsg == NULL) return -1;
// action.pCont = pMsg;
// action.contLen = sizeof(SDropVnodeMsg);
// action.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
// if (mndTransAppendUndoAction(pTrans, &action) != 0) {
// free(pMsg);
// return -1;
// }
// }
// }
return 0;
}
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) { static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0}; SStbObj stbObj = {0};
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
...@@ -269,6 +354,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre ...@@ -269,6 +354,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
} }
memcpy(stbObj.pSchema, pCreate->pSchema, totalSize); memcpy(stbObj.pSchema, pCreate->pSchema, totalSize);
int32_t code = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
...@@ -276,29 +362,30 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre ...@@ -276,29 +362,30 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
} }
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
SSdbRaw *pRedoRaw = mndStbActionEncode(&stbObj); if (mndSetCreateStbRedoLogs(pMnode, pTrans, &stbObj) != 0) {
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); goto CREATE_STB_OVER;
mndTransDrop(pTrans);
return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
SSdbRaw *pUndoRaw = mndStbActionEncode(&stbObj); if (mndSetCreateStbUndoLogs(pMnode, pTrans, &stbObj) != 0) {
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); goto CREATE_STB_OVER;
mndTransDrop(pTrans);
return -1;
} }
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj); if (mndSetCreateStbCommitLogs(pMnode, pTrans, &stbObj) != 0) {
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); goto CREATE_STB_OVER;
mndTransDrop(pTrans); }
return -1;
if (mndSetCreateStbRedoActions(pMnode, pTrans, &stbObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_STB_OVER;
}
if (mndSetCreateStbUndoActions(pMnode, pTrans, &stbObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_STB_OVER;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
...@@ -306,8 +393,11 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre ...@@ -306,8 +393,11 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
return -1; return -1;
} }
code = 0;
CREATE_STB_OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
...@@ -416,7 +506,39 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { ...@@ -416,7 +506,39 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
return 0;
}
static int32_t mndSetDropStbUndoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetDropStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) { static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stb:%s, failed to drop since %s", pStb->name, terrstr()); mError("stb:%s, failed to drop since %s", pStb->name, terrstr());
...@@ -424,36 +546,39 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) { ...@@ -424,36 +546,39 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
} }
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) {
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); goto DROP_STB_OVER;
mndTransDrop(pTrans);
return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
SSdbRaw *pUndoRaw = mndStbActionEncode(pStb); if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) {
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); goto DROP_STB_OVER;
mndTransDrop(pTrans);
return -1;
} }
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb); if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) {
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); goto DROP_STB_OVER;
mndTransDrop(pTrans); }
return -1;
if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_STB_OVER;
}
if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_STB_OVER;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); goto DROP_STB_OVER;
return -1;
} }
code = 0;
DROP_STB_OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册