提交 d3927b54 编写于 作者: S Shengliang Guan

Improve the error handling of mnode

上级 c9b8e748
......@@ -22,121 +22,70 @@
extern "C" {
#endif
#define SDB_GET_INT64(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt64(pRaw, dataPos, val) != 0) { \
tfree(pRow); \
return NULL; \
} \
dataPos += sizeof(int64_t); \
#define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \
{ \
if (func(pRaw, dataPos, val) != 0) { \
goto pos; \
} \
dataPos += sizeof(type); \
}
#define SDB_GET_INT32(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt32(pRaw, dataPos, val) != 0) { \
tfree(pRow); \
return NULL; \
} \
dataPos += sizeof(int32_t); \
}
#define SDB_GET_INT16(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt16(pRaw, dataPos, val) != 0) { \
tfree(pRow); \
return NULL; \
} \
dataPos += sizeof(int16_t); \
}
#define SDB_GET_INT8(pData, pRow, dataPos, val) \
{ \
if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \
tfree(pRow); \
return NULL; \
} \
dataPos += sizeof(int8_t); \
}
#define SDB_GET_BINARY(pRaw, pRow, dataPos, val, valLen) \
#define SDB_GET_BINARY(pRaw, dataPos, val, valLen, pos) \
{ \
if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
tfree(pRow); \
return NULL; \
goto pos; \
} \
dataPos += valLen; \
}
#define SDB_GET_RESERVE(pRaw, pRow, dataPos, valLen) \
{ \
char val[valLen] = {0}; \
if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
tfree(pRow); \
return NULL; \
} \
dataPos += valLen; \
}
#define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t)
#define SDB_SET_INT64(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
} \
dataPos += sizeof(int64_t); \
}
#define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t)
#define SDB_SET_INT32(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
} \
dataPos += sizeof(int32_t); \
}
#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t)
#define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
#define SDB_SET_INT16(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt16(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
} \
dataPos += sizeof(int16_t); \
#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \
{ \
char val[valLen] = {0}; \
SDB_GET_BINARY(pRaw, dataPos, val, valLen, pos) \
}
#define SDB_SET_INT8(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
} \
dataPos += sizeof(int8_t); \
#define SDB_SET_VAL(pRaw, dataPos, val, pos, func, type) \
{ \
if (func(pRaw, dataPos, val) != 0) { \
goto pos; \
} \
dataPos += sizeof(type); \
}
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \
#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t)
#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t)
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
#define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
{ \
if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
goto pos; \
} \
dataPos += valLen; \
}
#define SDB_SET_RESERVE(pRaw, dataPos, valLen) \
{ \
char val[valLen] = {0}; \
if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
} \
dataPos += valLen; \
#define SDB_SET_RESERVE(pRaw, dataPos, valLen, pos) \
{ \
char val[valLen] = {0}; \
SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
}
#define SDB_SET_DATALEN(pRaw, dataLen) \
#define SDB_SET_DATALEN(pRaw, dataLen, pos) \
{ \
if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
goto pos; \
} \
}
......
......@@ -115,8 +115,8 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
CheckInt32(1);
CheckInt32(2);
CheckInt32(3);
CheckInt32(0);
CheckInt32(0);
CheckInt16(1);
......
......@@ -66,75 +66,100 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("acct:%s, will be created while deploy sdb", acctObj.acct);
mDebug("acct:%s, will be created while deploy sdb, raw:%p", acctObj.acct, pRaw);
return sdbWrite(pMnode->pSdb, pRaw);
}
static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, TSDB_ACCT_VER_NUMBER, sizeof(SAcctObj) + TSDB_ACCT_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto ACCT_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pAcct->createdTime)
SDB_SET_INT64(pRaw, dataPos, pAcct->updateTime)
SDB_SET_INT32(pRaw, dataPos, pAcct->acctId)
SDB_SET_INT32(pRaw, dataPos, pAcct->status)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxUsers)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxDbs)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTimeSeries)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxStreams)
SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN, ACCT_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pAcct->createdTime, ACCT_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pAcct->updateTime, ACCT_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAcct->acctId, ACCT_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAcct->status, ACCT_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxUsers, ACCT_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxDbs, ACCT_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTimeSeries, ACCT_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxStreams, ACCT_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage, ACCT_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState, ACCT_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE, ACCT_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, ACCT_ENCODE_OVER)
terrno = 0;
ACCT_ENCODE_OVER:
if (terrno != 0) {
mError("acct:%s, failed to encode to raw:%p since %s", pAcct->acct, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("acct:%s, encode to raw:%p, row:%p", pAcct->acct, pRaw, pAcct);
return pRaw;
}
static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto ACCT_DECODE_OVER;
if (sver != TSDB_ACCT_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode acct since %s", terrstr());
return NULL;
goto ACCT_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj));
SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj));
if (pRow == NULL) goto ACCT_DECODE_OVER;
SAcctObj *pAcct = sdbGetRowObj(pRow);
if (pAcct == NULL) return NULL;
if (pAcct == NULL) goto ACCT_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pAcct->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->updateTime)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->acctId)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->status)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxUsers)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxDbs)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxTimeSeries)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxStreams)
SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->cfg.maxStorage)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.accessState)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_ACCT_RESERVE_SIZE)
SDB_GET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN, ACCT_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pAcct->createdTime, ACCT_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pAcct->updateTime, ACCT_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pAcct->acctId, ACCT_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pAcct->status, ACCT_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxUsers, ACCT_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxDbs, ACCT_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxTimeSeries, ACCT_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxStreams, ACCT_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pAcct->cfg.maxStorage, ACCT_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.accessState, ACCT_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE, ACCT_DECODE_OVER)
terrno = 0;
ACCT_DECODE_OVER:
if (terrno != 0) {
mError("acct:%s, failed to decode from raw:%p since %s", pAcct->acct, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("acct:%s, decode from raw:%p, row:%p", pAcct->acct, pRaw, pAcct);
return pRow;
}
static int32_t mndAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) {
mTrace("acct:%s, perform insert action", pAcct->acct);
mTrace("acct:%s, perform insert action, row:%p", pAcct->acct, pAcct);
return 0;
}
static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) {
mTrace("acct:%s, perform delete action", pAcct->acct);
mTrace("acct:%s, perform delete action, row:%p", pAcct->acct, pAcct);
return 0;
}
static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOldAcct, SAcctObj *pNewAcct) {
mTrace("acct:%s, perform update action", pOldAcct->acct);
mTrace("acct:%s, perform update action, old_row:%p new_row:%p", pOldAcct->acct, pOldAcct, pNewAcct);
pOldAcct->updateTime = pNewAcct->updateTime;
pOldAcct->status = pNewAcct->status;
......
......@@ -58,9 +58,9 @@ int32_t mndInitBnode(SMnode *pMnode) {
void mndCleanupBnode(SMnode *pMnode) {}
static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t snodeId) {
static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t bnodeId) {
SSdb *pSdb = pMnode->pSdb;
SBnodeObj *pObj = sdbAcquire(pSdb, SDB_BNODE, &snodeId);
SBnodeObj *pObj = sdbAcquire(pSdb, SDB_BNODE, &bnodeId);
if (pObj == NULL) {
terrno = TSDB_CODE_MND_BNODE_NOT_EXIST;
}
......@@ -73,47 +73,72 @@ static void mndReleaseBnode(SMnode *pMnode, SBnodeObj *pObj) {
}
static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_BNODE, TSDB_BNODE_VER_NUMBER, sizeof(SBnodeObj) + TSDB_BNODE_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto BNODE_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pObj->id);
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE)
SDB_SET_INT32(pRaw, dataPos, pObj->id, BNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, BNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, BNODE_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_ENCODE_OVER)
terrno = 0;
BNODE_ENCODE_OVER:
if (terrno != 0) {
mError("bnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("bnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRaw;
}
static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto BNODE_DECODE_OVER;
if (sver != TSDB_BNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode snode since %s", terrstr());
return NULL;
goto BNODE_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SBnodeObj));
SSdbRow *pRow = sdbAllocRow(sizeof(SBnodeObj));
if (pRow == NULL) goto BNODE_DECODE_OVER;
SBnodeObj *pObj = sdbGetRowObj(pRow);
if (pObj == NULL) return NULL;
if (pObj == NULL) goto BNODE_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_BNODE_RESERVE_SIZE)
SDB_GET_INT32(pRaw, dataPos, &pObj->id, BNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, BNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, BNODE_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_DECODE_OVER)
terrno = 0;
BNODE_DECODE_OVER:
if (terrno != 0) {
mError("bnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("bnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRow;
}
static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj) {
mTrace("snode:%d, perform insert action", pObj->id);
mTrace("bnode:%d, perform insert action, row:%p", pObj->id, pObj);
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
if (pObj->pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("snode:%d, failed to perform insert action since %s", pObj->id, terrstr());
mError("bnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
return -1;
}
......@@ -121,7 +146,7 @@ static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj) {
}
static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) {
mTrace("snode:%d, perform delete action", pObj->id);
mTrace("bnode:%d, perform delete action, row:%p", pObj->id, pObj);
if (pObj->pDnode != NULL) {
sdbRelease(pSdb, pObj->pDnode);
pObj->pDnode = NULL;
......@@ -131,7 +156,7 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) {
}
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOldBnode, SBnodeObj *pNewBnode) {
mTrace("snode:%d, perform update action", pOldBnode->id);
mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOldBnode->id, pOldBnode, pNewBnode);
pOldBnode->updateTime = pNewBnode->updateTime;
return 0;
}
......@@ -175,30 +200,30 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
}
static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateBnodeMsg *pCreate) {
SBnodeObj snodeObj = {0};
snodeObj.id = pDnode->id;
snodeObj.createdTime = taosGetTimestampMs();
snodeObj.updateTime = snodeObj.createdTime;
SBnodeObj bnodeObj = {0};
bnodeObj.id = pDnode->id;
bnodeObj.createdTime = taosGetTimestampMs();
bnodeObj.updateTime = bnodeObj.createdTime;
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
mError("snode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
mError("bnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
goto CREATE_BNODE_OVER;
}
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateBnodeRedoLogs(pTrans, &snodeObj) != 0) {
if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto CREATE_BNODE_OVER;
}
if (mndSetCreateBnodeCommitLogs(pTrans, &snodeObj) != 0) {
if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto CREATE_BNODE_OVER;
}
if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) {
if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_BNODE_OVER;
}
......@@ -221,18 +246,18 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) {
pCreate->dnodeId = htonl(pCreate->dnodeId);
mDebug("snode:%d, start to create", pCreate->dnodeId);
mDebug("bnode:%d, start to create", pCreate->dnodeId);
SBnodeObj *pObj = mndAcquireBnode(pMnode, pCreate->dnodeId);
if (pObj != NULL) {
mError("snode:%d, snode already exist", pObj->id);
mError("bnode:%d, bnode already exist", pObj->id);
mndReleaseBnode(pMnode, pObj);
return -1;
}
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
if (pDnode == NULL) {
mError("snode:%d, dnode not exist", pCreate->dnodeId);
mError("bnode:%d, dnode not exist", pCreate->dnodeId);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1;
}
......@@ -241,7 +266,7 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) {
mndReleaseDnode(pMnode, pDnode);
if (code != 0) {
mError("snode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
mError("bnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
return -1;
}
......@@ -290,11 +315,11 @@ static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pMsg, SBnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
mError("snode:%d, failed to drop since %s", pObj->id, terrstr());
mError("bnode:%d, failed to drop since %s", pObj->id, terrstr());
goto DROP_BNODE_OVER;
}
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
......@@ -328,24 +353,24 @@ static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg) {
SMDropBnodeMsg *pDrop = pMsg->rpcMsg.pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId);
mDebug("snode:%d, start to drop", pDrop->dnodeId);
mDebug("bnode:%d, start to drop", pDrop->dnodeId);
if (pDrop->dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR;
mError("snode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
mError("bnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
return -1;
}
SBnodeObj *pObj = mndAcquireBnode(pMnode, pDrop->dnodeId);
if (pObj == NULL) {
mError("snode:%d, not exist", pDrop->dnodeId);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("bnode:%d, not exist", pDrop->dnodeId);
terrno = TSDB_CODE_MND_BNODE_NOT_EXIST;
return -1;
}
int32_t code = mndDropBnode(pMnode, pMsg, pObj);
if (code != 0) {
mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
mError("bnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
return -1;
}
......
......@@ -63,55 +63,80 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj) + TSDB_CLUSTER_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto CLUSTER_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_INT64(pRaw, dataPos, pCluster->id);
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE)
SDB_SET_INT64(pRaw, dataPos, pCluster->id, CLUSTER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, CLUSTER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, CLUSTER_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_ENCODE_OVER)
terrno = 0;
CLUSTER_ENCODE_OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("cluster:%" PRId64 ", encode to raw:%p, row:%p", pCluster->id, pRaw, pCluster);
return pRaw;
}
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CLUSTER_DECODE_OVER;
if (sver != TSDB_CLUSTER_VER_NUMBE) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode cluster since %s", terrstr());
return NULL;
goto CLUSTER_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
if (pRow == NULL) goto CLUSTER_DECODE_OVER;
SClusterObj *pCluster = sdbGetRowObj(pRow);
if (pCluster == NULL) return NULL;
if (pCluster == NULL) goto CLUSTER_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime)
SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_CLUSTER_RESERVE_SIZE)
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, CLUSTER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, CLUSTER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, CLUSTER_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_DECODE_OVER)
terrno = 0;
CLUSTER_DECODE_OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("cluster:%" PRId64 ", decode from raw:%p, row:%p", pCluster->id, pRaw, pCluster);
return pRow;
}
static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%" PRId64 ", perform insert action", pCluster->id);
mTrace("cluster:%" PRId64 ", perform insert action, row:%p", pCluster->id, pCluster);
return 0;
}
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%" PRId64 ", perform delete action", pCluster->id);
mTrace("cluster:%" PRId64 ", perform delete action, row:%p", pCluster->id, pCluster);
return 0;
}
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster) {
mTrace("cluster:%" PRId64 ", perform update action", pOldCluster->id);
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) {
mTrace("cluster:%" PRId64 ", perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
return 0;
}
......@@ -135,7 +160,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("cluster:%" PRId64 ", will be created while deploy sdb", clusterObj.id);
mDebug("cluster:%" PRId64 ", will be created while deploy sdb, raw:%p", clusterObj.id, pRaw);
return sdbWrite(pMnode->pSdb, pRaw);
}
......
......@@ -67,66 +67,87 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto CM_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN);
SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN);
SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime);
SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime);
SDB_SET_INT64(pRaw, dataPos, pConsumer->uid);
SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN, CM_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN, CM_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime, CM_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime, CM_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pConsumer->uid, CM_ENCODE_OVER)
/*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/
SDB_SET_INT32(pRaw, dataPos, pConsumer->version);
SDB_SET_INT32(pRaw, dataPos, pConsumer->version, CM_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER)
CM_ENCODE_OVER:
if (terrno != 0) {
mError("consumer:%s, failed to encode to raw:%p since %s", pConsumer->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("consumer:%s, encode to raw:%p, row:%p", pConsumer->name, pRaw, pConsumer);
return pRaw;
}
static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CONSUME_DECODE_OVER;
if (sver != MND_CONSUMER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode consumer since %s", terrstr());
return NULL;
goto CONSUME_DECODE_OVER;
}
int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size);
int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto CONSUME_DECODE_OVER;
SConsumerObj *pConsumer = sdbGetRowObj(pRow);
if (pConsumer == NULL) return NULL;
if (pConsumer == NULL) goto CONSUME_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN);
SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN);
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->createTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->updateTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid);
SDB_GET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN, CONSUME_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN, CONSUME_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pConsumer->createTime, CONSUME_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pConsumer->updateTime, CONSUME_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pConsumer->uid, CONSUME_DECODE_OVER)
/*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/
SDB_GET_INT32(pRaw, pRow, dataPos, &pConsumer->version);
SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE);
SDB_GET_INT32(pRaw, dataPos, &pConsumer->version, CONSUME_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER)
terrno = 0;
CONSUME_DECODE_OVER:
if (terrno != 0) {
mError("consumer:%s, failed to decode from raw:%p since %s", pConsumer->name, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("consumer:%s, decode from raw:%p, row:%p", pConsumer->name, pRaw, pConsumer);
return pRow;
}
static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) {
mTrace("consumer:%s, perform insert action", pConsumer->name);
mTrace("consumer:%s, perform insert action, row:%p", pConsumer->name, pConsumer);
return 0;
}
static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) {
mTrace("consumer:%s, perform delete action", pConsumer->name);
mTrace("consumer:%s, perform delete action, row:%p", pConsumer->name, pConsumer);
return 0;
}
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) {
mTrace("consumer:%s, perform update action", pOldConsumer->name);
mTrace("consumer:%s, perform update action, old_row:%p new_row:%p", pOldConsumer->name, pOldConsumer, pNewConsumer);
atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime);
atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version);
......
......@@ -65,100 +65,125 @@ int32_t mndInitDb(SMnode *pMnode) {
void mndCleanupDb(SMnode *pMnode) {}
static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_DB_VER_NUMBER, sizeof(SDbObj) + TSDB_DB_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto DB_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pDb->name, TSDB_DB_FNAME_LEN)
SDB_SET_BINARY(pRaw, dataPos, pDb->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pDb->createdTime)
SDB_SET_INT64(pRaw, dataPos, pDb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pDb->uid)
SDB_SET_INT32(pRaw, dataPos, pDb->cfgVersion)
SDB_SET_INT32(pRaw, dataPos, pDb->vgVersion)
SDB_SET_INT8(pRaw, dataPos, pDb->hashMethod)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfVgroups)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep2)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.minRows)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.maxRows)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.commitTime)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.fsyncPeriod)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.walLevel)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.precision)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.compression)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.replications)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.quorum)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.update)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_BINARY(pRaw, dataPos, pDb->name, TSDB_DB_FNAME_LEN, DB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pDb->acct, TSDB_USER_LEN, DB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pDb->createdTime, DB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pDb->updateTime, DB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pDb->uid, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfgVersion, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->vgVersion, DB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->hashMethod, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfVgroups, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep2, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.minRows, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.maxRows, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.commitTime, DB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.fsyncPeriod, DB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.walLevel, DB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.precision, DB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.compression, DB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.replications, DB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.quorum, DB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.update, DB_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, DB_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, DB_ENCODE_OVER)
terrno = 0;
DB_ENCODE_OVER:
if (terrno != 0) {
mError("db:%s, failed to encode to raw:%p since %s", pDb->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("db:%s, encode to raw:%p, row:%p", pDb->name, pRaw, pDb);
return pRaw;
}
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto DB_DECODE_OVER;
if (sver != TSDB_DB_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode db since %s", terrstr());
return NULL;
goto DB_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SDbObj));
SDbObj *pDb = sdbGetRowObj(pRow);
if (pDb == NULL) return NULL;
if (pRow == NULL) goto DB_DECODE_OVER;
SDbObj *pDb = sdbGetRowObj(pRow);
if (pDb == NULL) goto DB_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pDb->name, TSDB_DB_FNAME_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pDb->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfgVersion)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->vgVersion)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->hashMethod)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.numOfVgroups)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep0)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep1)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep2)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.minRows)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.maxRows)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.commitTime)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.fsyncPeriod)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.walLevel)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.precision)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.compression)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.replications)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.quorum)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.update)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.cacheLastRow)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_DB_RESERVE_SIZE)
SDB_GET_BINARY(pRaw, dataPos, pDb->name, TSDB_DB_FNAME_LEN, DB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pDb->acct, TSDB_USER_LEN, DB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pDb->createdTime, DB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pDb->updateTime, DB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pDb->uid, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfgVersion, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->vgVersion, DB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->hashMethod, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.numOfVgroups, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.cacheBlockSize, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.totalBlocks, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysPerFile, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep0, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep1, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.daysToKeep2, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.minRows, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.maxRows, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.commitTime, DB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.fsyncPeriod, DB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.walLevel, DB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.precision, DB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.compression, DB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.replications, DB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.quorum, DB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.update, DB_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLastRow, DB_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_DECODE_OVER)
terrno = 0;
DB_DECODE_OVER:
if (terrno != 0) {
mError("db:%s, failed to decode from raw:%p since %s", pDb->name, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("db:%s, decode from raw:%p, row:%p", pDb->name, pRaw, pDb);
return pRow;
}
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
mTrace("db:%s, perform insert action", pDb->name);
mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb);
return 0;
}
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
mTrace("db:%s, perform delete action", pDb->name);
mTrace("db:%s, perform delete action, row:%p", pDb->name, pDb);
return 0;
}
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) {
mTrace("db:%s, perform update action", pOldDb->name);
mTrace("db:%s, perform update action, old_row:%p new_row:%p", pOldDb->name, pOldDb, pNewDb);
pOldDb->updateTime = pNewDb->updateTime;
pOldDb->cfgVersion = pNewDb->cfgVersion;
pOldDb->vgVersion = pNewDb->vgVersion;
......
......@@ -100,65 +100,90 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (pRaw == NULL) return -1;
if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1;
mDebug("dnode:%d, will be created while deploy sdb", dnodeObj.id);
mDebug("dnode:%d, will be created while deploy sdb, raw:%p", dnodeObj.id, pRaw);
return sdbWrite(pMnode->pSdb, pRaw);
}
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto DNODE_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pDnode->id);
SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime)
SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime)
SDB_SET_INT16(pRaw, dataPos, pDnode->port)
SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_INT32(pRaw, dataPos, pDnode->id, DNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, DNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, DNODE_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pDnode->port, DNODE_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, DNODE_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, DNODE_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, DNODE_ENCODE_OVER);
terrno = 0;
DNODE_ENCODE_OVER:
if (terrno != 0) {
mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("dnode:%d, encode to raw:%p, row:%p", pDnode->id, pRaw, pDnode);
return pRaw;
}
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto DNODE_DECODE_OVER;
if (sver != TSDB_DNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode dnode since %s", terrstr());
return NULL;
goto DNODE_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SDnodeObj));
SSdbRow *pRow = sdbAllocRow(sizeof(SDnodeObj));
if (pRow == NULL) goto DNODE_DECODE_OVER;
SDnodeObj *pDnode = sdbGetRowObj(pRow);
if (pDnode == NULL) return NULL;
if (pDnode == NULL) goto DNODE_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pDnode->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->updateTime)
SDB_GET_INT16(pRaw, pRow, dataPos, &pDnode->port)
SDB_GET_BINARY(pRaw, pRow, dataPos, pDnode->fqdn, TSDB_FQDN_LEN)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_DNODE_RESERVE_SIZE)
SDB_GET_INT32(pRaw, dataPos, &pDnode->id, DNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, DNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, DNODE_DECODE_OVER)
SDB_GET_INT16(pRaw, dataPos, &pDnode->port, DNODE_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, DNODE_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, DNODE_DECODE_OVER)
terrno = 0;
DNODE_DECODE_OVER:
if (terrno != 0) {
mError("dnode:%d, failed to decode from raw:%p since %s", pDnode->id, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("dnode:%d, decode from raw:%p, row:%p", pDnode->id, pRaw, pDnode);
return pRow;
}
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
mTrace("dnode:%d, perform insert action", pDnode->id);
mTrace("dnode:%d, perform insert action, row:%p", pDnode->id, pDnode);
pDnode->offlineReason = DND_REASON_STATUS_NOT_RECEIVED;
snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port);
return 0;
}
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
mTrace("dnode:%d, perform delete action", pDnode->id);
mTrace("dnode:%d, perform delete action, row:%p", pDnode->id, pDnode);
return 0;
}
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode) {
mTrace("dnode:%d, perform update action", pOldDnode->id);
mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOldDnode->id, pOldDnode, pNewDnode);
pOldDnode->updateTime = pNewDnode->updateTime;
return 0;
}
......
......@@ -58,76 +58,101 @@ int32_t mndInitFunc(SMnode *pMnode) {
void mndCleanupFunc(SMnode *pMnode) {}
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = pFunc->commentSize + pFunc->codeSize + sizeof(SFuncObj);
SSdbRaw *pRaw = sdbAllocRaw(SDB_FUNC, SDB_FUNC_VER, size);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto FUNC_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN)
SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime)
SDB_SET_INT8(pRaw, dataPos, pFunc->funcType)
SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType)
SDB_SET_INT8(pRaw, dataPos, pFunc->align)
SDB_SET_INT8(pRaw, dataPos, pFunc->outputType)
SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen)
SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize)
SDB_SET_INT64(pRaw, dataPos, pFunc->sigature)
SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize)
SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize)
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize)
SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize)
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime, FUNC_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pFunc->funcType, FUNC_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType, FUNC_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pFunc->align, FUNC_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, FUNC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, FUNC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, FUNC_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pFunc->sigature, FUNC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, FUNC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, FUNC_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, FUNC_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, FUNC_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, FUNC_ENCODE_OVER);
terrno = 0;
FUNC_ENCODE_OVER:
if (terrno != 0) {
mError("func:%s, failed to encode to raw:%p since %s", pFunc->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("func:%d, encode to raw:%p, row:%p", pFunc->name, pRaw, pFunc);
return pRaw;
}
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto FUNC_DECODE_OVER;
if (sver != SDB_FUNC_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode func since %s", terrstr());
return NULL;
goto FUNC_DECODE_OVER;
}
int32_t size = sizeof(SFuncObj) + TSDB_FUNC_COMMENT_LEN + TSDB_FUNC_CODE_LEN;
SSdbRow *pRow = sdbAllocRow(size);
int32_t size = sizeof(SFuncObj) + TSDB_FUNC_COMMENT_LEN + TSDB_FUNC_CODE_LEN;
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto FUNC_DECODE_OVER;
SFuncObj *pFunc = sdbGetRowObj(pRow);
if (pFunc == NULL) return NULL;
if (pFunc == NULL) goto FUNC_DECODE_OVER;
char *tmp = (char *)pFunc + sizeof(SFuncObj);
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pFunc->createdTime)
SDB_GET_INT8(pRaw, pRow, dataPos, &pFunc->funcType)
SDB_GET_INT8(pRaw, pRow, dataPos, &pFunc->scriptType)
SDB_GET_INT8(pRaw, pRow, dataPos, &pFunc->align)
SDB_GET_INT8(pRaw, pRow, dataPos, &pFunc->outputType)
SDB_GET_INT32(pRaw, pRow, dataPos, &pFunc->outputLen)
SDB_GET_INT32(pRaw, pRow, dataPos, &pFunc->bufSize)
SDB_GET_INT64(pRaw, pRow, dataPos, &pFunc->sigature)
SDB_GET_INT32(pRaw, pRow, dataPos, &pFunc->commentSize)
SDB_GET_INT32(pRaw, pRow, dataPos, &pFunc->codeSize)
SDB_GET_BINARY(pRaw, pRow, dataPos, pFunc->pData, pFunc->commentSize + pFunc->codeSize)
SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, FUNC_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pFunc->createdTime, FUNC_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pFunc->funcType, FUNC_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pFunc->scriptType, FUNC_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pFunc->align, FUNC_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, FUNC_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pFunc->sigature, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, FUNC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, FUNC_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pFunc->pData, pFunc->commentSize + pFunc->codeSize, FUNC_DECODE_OVER)
pFunc->pComment = pFunc->pData;
pFunc->pCode = (pFunc->pData + pFunc->commentSize);
terrno = 0;
FUNC_DECODE_OVER:
if (terrno != 0) {
mError("func:%s, failed to decode from raw:%p since %s", pFunc->name, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("func:%s, decode from raw:%p, row:%p", pFunc->name, pRaw, pFunc);
return pRow;
}
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) {
mTrace("func:%s, perform insert action", pFunc->name);
mTrace("func:%s, perform insert action, row:%p", pFunc->name, pFunc);
return 0;
}
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
mTrace("func:%s, perform delete action", pFunc->name);
mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc);
return 0;
}
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) {
mTrace("func:%s, perform update action", pOldFunc->name);
mTrace("func:%s, perform update action, old_row:%p new_row:%p", pOldFunc->name, pOldFunc, pNewFunc);
return 0;
}
......
......@@ -117,50 +117,75 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("mnode:%d, will be created while deploy sdb", mnodeObj.id);
mDebug("mnode:%d, will be created while deploy sdb, raw:%p", mnodeObj.id, pRaw);
return sdbWrite(pMnode->pSdb, pRaw);
}
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, TSDB_MNODE_VER_NUMBER, sizeof(SMnodeObj) + TSDB_MNODE_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto MNODE_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pObj->id);
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE)
SDB_SET_INT32(pRaw, dataPos, pObj->id, MNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, MNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, MNODE_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE, MNODE_ENCODE_OVER)
terrno = 0;
MNODE_ENCODE_OVER:
if (terrno != 0) {
mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("mnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRaw;
}
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != TSDB_MNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode mnode since %s", terrstr());
return NULL;
goto MNODE_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
if (pRow == NULL) goto MNODE_DECODE_OVER;
SMnodeObj *pObj = sdbGetRowObj(pRow);
if (pObj == NULL) return NULL;
if (pObj == NULL) goto MNODE_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_MNODE_RESERVE_SIZE)
SDB_GET_INT32(pRaw, dataPos, &pObj->id, MNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, MNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, MNODE_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE, MNODE_DECODE_OVER)
terrno = 0;
MNODE_DECODE_OVER:
if (terrno != 0) {
mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("mnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRow;
}
static void mnodeResetMnode(SMnodeObj *pObj) { pObj->role = TAOS_SYNC_STATE_FOLLOWER; }
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
mTrace("mnode:%d, perform insert action", pObj->id);
mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
if (pObj->pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
......@@ -173,7 +198,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
}
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
mTrace("mnode:%d, perform delete action", pObj->id);
mTrace("mnode:%d, perform delete action, row:%p", pObj->id, pObj);
if (pObj->pDnode != NULL) {
sdbRelease(pSdb, pObj->pDnode);
pObj->pDnode = NULL;
......@@ -183,7 +208,7 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
}
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode) {
mTrace("mnode:%d, perform update action", pOldMnode->id);
mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOldMnode->id, pOldMnode, pNewMnode);
pOldMnode->updateTime = pNewMnode->updateTime;
return 0;
}
......@@ -370,7 +395,7 @@ CREATE_MNODE_OVER:
}
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMnode *pMnode = pMsg->pMnode;
SMCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId);
......@@ -537,7 +562,7 @@ DROP_MNODE_OVER:
}
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMnode *pMnode = pMsg->pMnode;
SMDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId);
......
......@@ -73,43 +73,68 @@ static void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj) {
}
static SSdbRaw *mndQnodeActionEncode(SQnodeObj *pObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_QNODE, TSDB_QNODE_VER_NUMBER, sizeof(SQnodeObj) + TSDB_QNODE_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto QNODE_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pObj->id);
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_QNODE_RESERVE_SIZE)
SDB_SET_INT32(pRaw, dataPos, pObj->id, QNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, QNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, QNODE_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_QNODE_RESERVE_SIZE, QNODE_ENCODE_OVER)
terrno = 0;
QNODE_ENCODE_OVER:
if (terrno != 0) {
mError("qnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("qnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRaw;
}
static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto QNODE_DECODE_OVER;
if (sver != TSDB_QNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode qnode since %s", terrstr());
return NULL;
goto QNODE_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SQnodeObj));
SSdbRow *pRow = sdbAllocRow(sizeof(SQnodeObj));
if (pRow == NULL) goto QNODE_DECODE_OVER;
SQnodeObj *pObj = sdbGetRowObj(pRow);
if (pObj == NULL) return NULL;
if (pObj == NULL) goto QNODE_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_QNODE_RESERVE_SIZE)
SDB_GET_INT32(pRaw, dataPos, &pObj->id, QNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, QNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, QNODE_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_QNODE_RESERVE_SIZE, QNODE_DECODE_OVER)
terrno = 0;
QNODE_DECODE_OVER:
if (terrno != 0) {
mError("qnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("qnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRow;
}
static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) {
mTrace("qnode:%d, perform insert action", pObj->id);
mTrace("qnode:%d, perform insert action, row:%p", pObj->id, pObj);
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
if (pObj->pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
......@@ -121,7 +146,7 @@ static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) {
}
static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
mTrace("qnode:%d, perform delete action", pObj->id);
mTrace("qnode:%d, perform delete action, row:%p", pObj->id, pObj);
if (pObj->pDnode != NULL) {
sdbRelease(pSdb, pObj->pDnode);
pObj->pDnode = NULL;
......@@ -131,7 +156,7 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
}
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOldQnode, SQnodeObj *pNewQnode) {
mTrace("qnode:%d, perform update action", pOldQnode->id);
mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOldQnode->id, pOldQnode, pNewQnode);
pOldQnode->updateTime = pNewQnode->updateTime;
return 0;
}
......@@ -339,7 +364,7 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg) {
SQnodeObj *pObj = mndAcquireQnode(pMnode, pDrop->dnodeId);
if (pObj == NULL) {
mError("qnode:%d, not exist", pDrop->dnodeId);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
terrno = TSDB_CODE_MND_QNODE_NOT_EXIST;
return -1;
}
......
......@@ -73,43 +73,68 @@ static void mndReleaseSnode(SMnode *pMnode, SSnodeObj *pObj) {
}
static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, TSDB_SNODE_VER_NUMBER, sizeof(SSnodeObj) + TSDB_SNODE_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto SNODE_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pObj->id);
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE)
SDB_SET_INT32(pRaw, dataPos, pObj->id, SNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, SNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, SNODE_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_ENCODE_OVER)
terrno = 0;
SNODE_ENCODE_OVER:
if (terrno != 0) {
mError("snode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("snode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRaw;
}
static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SNODE_DECODE_OVER;
if (sver != TSDB_SNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode snode since %s", terrstr());
return NULL;
goto SNODE_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj));
SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj));
if (pRow == NULL) goto SNODE_DECODE_OVER;
SSnodeObj *pObj = sdbGetRowObj(pRow);
if (pObj == NULL) return NULL;
if (pObj == NULL) goto SNODE_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pObj->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pObj->updateTime)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_SNODE_RESERVE_SIZE)
SDB_GET_INT32(pRaw, dataPos, &pObj->id, SNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, SNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, SNODE_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_DECODE_OVER)
terrno = 0;
SNODE_DECODE_OVER:
if (terrno != 0) {
mError("snode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("snode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
return pRow;
}
static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj) {
mTrace("snode:%d, perform insert action", pObj->id);
mTrace("snode:%d, perform insert action, row:%p", pObj->id, pObj);
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
if (pObj->pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
......@@ -121,7 +146,7 @@ static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj) {
}
static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
mTrace("snode:%d, perform delete action", pObj->id);
mTrace("snode:%d, perform delete action, row:%p", pObj->id, pObj);
if (pObj->pDnode != NULL) {
sdbRelease(pSdb, pObj->pDnode);
pObj->pDnode = NULL;
......@@ -131,7 +156,7 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
}
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOldSnode, SSnodeObj *pNewSnode) {
mTrace("snode:%d, perform update action", pOldSnode->id);
mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOldSnode->id, pOldSnode, pNewSnode);
pOldSnode->updateTime = pNewSnode->updateTime;
return 0;
}
......@@ -339,7 +364,7 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg) {
SSnodeObj *pObj = mndAcquireSnode(pMnode, pDrop->dnodeId);
if (pObj == NULL) {
mError("snode:%d, not exist", pDrop->dnodeId);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
terrno = TSDB_CODE_MND_SNODE_NOT_EXIST;
return -1;
}
......
......@@ -70,90 +70,115 @@ int32_t mndInitStb(SMnode *pMnode) {
void mndCleanupStb(SMnode *pMnode) {}
static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto STB_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN)
SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN)
SDB_SET_INT64(pRaw, dataPos, pStb->createdTime)
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pStb->uid)
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid)
SDB_SET_INT32(pRaw, dataPos, pStb->version)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags)
SDB_SET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->createdTime, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->uid, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER)
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type);
SDB_SET_INT32(pRaw, dataPos, pSchema->colId);
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes);
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER)
terrno = 0;
STB_ENCODE_OVER:
if (terrno != 0) {
mError("stb:%s, failed to encode to raw:%p since %s", pStb->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("stb:%s, encode to raw:%p, row:%p", pStb->name, pRaw, pStb);
return pRaw;
}
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STB_DECODE_OVER;
if (sver != TSDB_STB_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode stable since %s", terrstr());
return NULL;
goto STB_DECODE_OVER;
}
int32_t size = sizeof(SStbObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto STB_DECODE_OVER;
SStbObj *pStb = sdbGetRowObj(pRow);
if (pStb == NULL) return NULL;
if (pStb == NULL) goto STB_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pStb->db, TSDB_DB_FNAME_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->uid)
SDB_GET_INT64(pRaw, pRow, dataPos, &pStb->dbUid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfColumns)
SDB_GET_INT32(pRaw, pRow, dataPos, &pStb->numOfTags)
SDB_GET_BINARY(pRaw, dataPos, pStb->name, TSDB_TABLE_FNAME_LEN, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pStb->db, TSDB_DB_FNAME_LEN, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->createdTime, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->updateTime, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->uid, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER)
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pStb->pSchema = calloc(totalCols, sizeof(SSchema));
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i];
SDB_GET_INT8(pRaw, pRow, dataPos, &pSchema->type);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->colId);
SDB_GET_INT32(pRaw, pRow, dataPos, &pSchema->bytes);
SDB_GET_BINARY(pRaw, pRow, dataPos, pSchema->name, TSDB_COL_NAME_LEN);
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_STB_RESERVE_SIZE)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_DECODE_OVER)
terrno = 0;
STB_DECODE_OVER:
if (terrno != 0) {
mError("stb:%s, failed to decode from raw:%p since %s", pStb->name, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("stb:%s, decode from raw:%p, row:%p", pStb->name, pRaw, pStb);
return pRow;
}
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform insert action", pStb->name);
mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
return 0;
}
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform delete action", pStb->name);
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
return 0;
}
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) {
mTrace("stb:%s, perform update action", pOldStb->name);
mTrace("stb:%s, perform update action, old_row:%p new_row:%p", pOldStb->name, pOldStb, pNewStb);
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
atomic_exchange_32(&pOldStb->version, pNewStb->version);
......@@ -177,7 +202,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb
}
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName);
if (pStb == NULL) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
......@@ -202,9 +227,9 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) {
SVCreateTbReq req;
void * buf;
void *buf;
int bsize;
SMsgHead * pMsgHead;
SMsgHead *pMsgHead;
req.ver = 0;
req.name = pStb->name;
......@@ -325,9 +350,9 @@ static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj
}
static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void * pIter = NULL;
void *pIter = NULL;
int contLen;
while (1) {
......@@ -361,9 +386,9 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
}
static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void * pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
......@@ -467,7 +492,7 @@ CREATE_STB_OVER:
}
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
SMnode * pMnode = pMsg->pMnode;
SMnode *pMnode = pMsg->pMnode;
SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to create", pCreate->name);
......@@ -551,7 +576,7 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; }
static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
SMnode * pMnode = pMsg->pMnode;
SMnode *pMnode = pMsg->pMnode;
SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to alter", pAlter->name);
......@@ -665,7 +690,7 @@ DROP_STB_OVER:
}
static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
SMnode * pMnode = pMsg->pMnode;
SMnode *pMnode = pMsg->pMnode;
SDropStbMsg *pDrop = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to drop", pDrop->name);
......@@ -700,7 +725,7 @@ static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
}
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
SMnode * pMnode = pMsg->pMnode;
SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
......@@ -772,7 +797,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
}
int32_t numOfStbs = 0;
void * pIter = NULL;
void *pIter = NULL;
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
......@@ -791,7 +816,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1;
......
......@@ -59,80 +59,105 @@ int32_t mndInitTopic(SMnode *pMnode) {
void mndCleanupTopic(SMnode *pMnode) {}
static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(STopicObj) + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN);
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN);
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime);
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime);
SDB_SET_INT64(pRaw, dataPos, pTopic->uid);
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid);
SDB_SET_INT32(pRaw, dataPos, pTopic->version);
SDB_SET_INT32(pRaw, dataPos, pTopic->execLen);
SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE);
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pTopic->execLen, TOPIC_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen, TOPIC_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER)
terrno = 0;
TOPIC_ENCODE_OVER:
if (terrno != 0) {
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
return pRaw;
}
static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
if (sver != MND_TOPIC_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode topic since %s", terrstr());
return NULL;
goto TOPIC_DECODE_OVER;
}
int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size);
int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema);
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto TOPIC_DECODE_OVER;
STopicObj *pTopic = sdbGetRowObj(pRow);
if (pTopic == NULL) return NULL;
if (pTopic == NULL) goto TOPIC_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN);
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->db, TSDB_DB_FNAME_LEN);
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->createTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->updateTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->uid);
SDB_GET_INT64(pRaw, pRow, dataPos, &pTopic->dbUid);
SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->version);
SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->execLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->executor, pTopic->execLen);
SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->sqlLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->sql, pTopic->sqlLen);
SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TOPIC_RESERVE_SIZE);
SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pTopic->execLen, TOPIC_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen, TOPIC_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
terrno = 0;
TOPIC_DECODE_OVER:
if (terrno != 0) {
mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
return pRow;
}
static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic) {
mTrace("topic:%s, perform insert action", pTopic->name);
mTrace("topic:%s, perform insert action, row:%p", pTopic->name, pTopic);
return 0;
}
static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic) {
mTrace("topic:%s, perform delete action", pTopic->name);
mTrace("topic:%s, perform delete action, row:%p", pTopic->name, pTopic);
return 0;
}
static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj *pNewTopic) {
mTrace("topic:%s, perform update action", pOldTopic->name);
mTrace("topic:%s, perform update action, old_row:%p new_row:%p", pOldTopic->name, pOldTopic, pNewTopic);
atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
taosWLockLatch(&pOldTopic->lock);
//TODO handle update
// TODO handle update
taosWUnLockLatch(&pOldTopic->lock);
return 0;
......@@ -201,9 +226,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
}
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMCreateTopicReq* pCreate;
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMCreateTopicReq *pCreate;
tDeserializeSCMCreateTopicReq(msgStr, pCreate);
mDebug("topic:%s, start to create", pCreate->name);
......@@ -245,9 +270,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) {
return 0;
}
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { return 0; }
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
......
......@@ -31,6 +31,7 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray);
static void mndTransDropData(STrans *pTrans);
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
......@@ -70,6 +71,8 @@ int32_t mndInitTrans(SMnode *pMnode) {
void mndCleanupTrans(SMnode *pMnode) {}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t rawDataLen = sizeof(STrans) + MND_TRANS_RESERVE_SIZE;
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
......@@ -109,75 +112,95 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
}
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id)
SDB_SET_INT8(pRaw, dataPos, pTrans->policy)
SDB_SET_INT32(pRaw, dataPos, redoLogNum)
SDB_SET_INT32(pRaw, dataPos, undoLogNum)
SDB_SET_INT32(pRaw, dataPos, commitLogNum)
SDB_SET_INT32(pRaw, dataPos, redoActionNum)
SDB_SET_INT32(pRaw, dataPos, undoActionNum)
SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER)
for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
}
for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
}
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
SDB_SET_INT16(pRaw, dataPos, pAction->msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
SDB_SET_INT16(pRaw, dataPos, pAction->msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER)
terrno = 0;
TRANS_ENCODE_OVER:
if (terrno != 0) {
mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
return pRaw;
}
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int32_t code = 0;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
STrans *pTrans = NULL;
char *pData = NULL;
int32_t dataLen = 0;
int8_t sver = 0;
int32_t redoLogNum = 0;
int32_t undoLogNum = 0;
int32_t commitLogNum = 0;
int32_t redoActionNum = 0;
int32_t undoActionNum = 0;
int32_t dataPos = 0;
STransAction action = {0};
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TRANS_DECODE_OVER;
if (sver != MND_TRANS_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode trans since %s", terrstr());
return NULL;
goto TRANS_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
STrans *pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) {
mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr());
return NULL;
}
pRow = sdbAllocRow(sizeof(STrans));
if (pRow == NULL) goto TRANS_DECODE_OVER;
pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) goto TRANS_DECODE_OVER;
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
......@@ -185,112 +208,79 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
mDebug("trans:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw);
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
int32_t redoLogNum = 0;
int32_t undoLogNum = 0;
int32_t commitLogNum = 0;
int32_t redoActionNum = 0;
int32_t undoActionNum = 0;
if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->commitLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy)
SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER)
for (int32_t i = 0; i < redoLogNum; ++i) {
int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->redoLogs, &pData);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
pData = malloc(dataLen);
if (pData == NULL) goto TRANS_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
pData = NULL;
}
for (int32_t i = 0; i < undoLogNum; ++i) {
int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->undoLogs, &pData);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
pData = malloc(dataLen);
if (pData == NULL) goto TRANS_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
pData = NULL;
}
for (int32_t i = 0; i < commitLogNum; ++i) {
int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
char *pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->commitLogs, &pData);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
pData = malloc(dataLen);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
pData = NULL;
}
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction action = {0};
SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
SDB_GET_INT16(pRaw, pRow, dataPos, &action.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen)
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
action.pCont = malloc(action.contLen);
if (action.pCont == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen);
void *ret = taosArrayPush(pTrans->redoActions, &action);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
if (action.pCont == NULL) goto TRANS_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto TRANS_DECODE_OVER;
action.pCont = NULL;
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction action = {0};
SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
SDB_GET_INT16(pRaw, pRow, dataPos, &action.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen)
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
action.pCont = malloc(action.contLen);
if (action.pCont == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen);
void *ret = taosArrayPush(pTrans->undoActions, &action);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
if (action.pCont == NULL) goto TRANS_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto TRANS_DECODE_OVER;
action.pCont = NULL;
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TRANS_RESERVE_SIZE)
SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER)
terrno = 0;
TRANS_DECODE_OVER:
if (code != 0) {
mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno));
mndTransDrop(pTrans);
terrno = code;
if (terrno != 0) {
mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
mndTransDropData(pTrans);
tfree(pRow);
tfree(pData);
tfree(action.pCont);
return NULL;
}
......@@ -304,15 +294,17 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
return 0;
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform delete action, data:%p", pTrans->id, pTrans);
static void mndTransDropData(STrans *pTrans) {
mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs);
mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions);
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform delete action, data:%p", pTrans->id, pTrans);
mndTransDropData(pTrans);
return 0;
}
......@@ -367,6 +359,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
}
static void mndTransDropLogs(SArray *pArray) {
if (pArray == NULL) return;
for (int32_t i = 0; i < pArray->size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
sdbFreeRaw(pRaw);
......@@ -376,6 +369,7 @@ static void mndTransDropLogs(SArray *pArray) {
}
static void mndTransDropActions(SArray *pArray) {
if (pArray == NULL) return;
for (int32_t i = 0; i < pArray->size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
free(pAction->pCont);
......@@ -385,12 +379,7 @@ static void mndTransDropActions(SArray *pArray) {
}
void mndTransDrop(STrans *pTrans) {
mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs);
mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions);
mndTransDropData(pTrans);
mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
tfree(pTrans);
}
......
......@@ -94,45 +94,68 @@ static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
}
static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, TSDB_USER_VER_NUMBER, sizeof(SUserObj) + TSDB_USER_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto USER_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime)
SDB_SET_INT8(pRaw, dataPos, pUser->superUser)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, USER_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, USER_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, USER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime, USER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime, USER_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pUser->superUser, USER_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE, USER_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, USER_ENCODE_OVER)
terrno = 0;
USER_ENCODE_OVER:
if (terrno != 0) {
mError("user:%s, failed to encode to raw:%p since %s", pUser->user, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("user:%s, encode to raw:%p, row:%p", pUser->user, pRaw, pUser);
return pRaw;
}
static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto USER_DECODE_OVER;
if (sver != TSDB_USER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode user since %s", terrstr());
return NULL;
goto USER_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj));
SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj));
if (pRow == NULL) goto USER_DECODE_OVER;
SUserObj *pUser = sdbGetRowObj(pRow);
if (pUser == NULL) return NULL;
if (pUser == NULL) goto USER_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->user, TSDB_USER_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_PASSWORD_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superUser)
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_USER_RESERVE_SIZE)
SDB_GET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, USER_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, USER_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, USER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pUser->createdTime, USER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pUser->updateTime, USER_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pUser->superUser, USER_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_USER_RESERVE_SIZE, USER_DECODE_OVER)
terrno = 0;
USER_DECODE_OVER:
if (terrno != 0) {
mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("user:%s, decode from raw:%p, row:%p", pUser->user, pRaw, pUser);
return pRow;
......
......@@ -70,77 +70,102 @@ int32_t mndInitVgroup(SMnode *pMnode) {
void mndCleanupVgroup(SMnode *pMnode) {}
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUMBER, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
if (pRaw == NULL) return NULL;
if (pRaw == NULL) goto VG_ENCODE_OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId)
SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime)
SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime)
SDB_SET_INT32(pRaw, dataPos, pVgroup->version)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd)
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN)
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid)
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica)
SDB_SET_INT32(pRaw, dataPos, pVgroup->vgId, VG_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime, VG_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime, VG_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->version, VG_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin, VG_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, VG_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, VG_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, VG_ENCODE_OVER)
for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId)
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, VG_ENCODE_OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, VG_ENCODE_OVER)
terrno = 0;
VG_ENCODE_OVER:
if (terrno != 0) {
mError("vgId:%d, failed to encode to raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE)
SDB_SET_DATALEN(pRaw, dataPos);
mTrace("vgId:%d, encode to raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
return pRaw;
}
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto VG_DECODE_OVER;
if (sver != TSDB_VGROUP_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode vgroup since %s", terrstr());
return NULL;
goto VG_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj));
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) return NULL;
if (pRow == NULL) goto VG_DECODE_OVER;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto VG_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->vgId)
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->updateTime)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashBegin)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashEnd)
SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->dbUid)
SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->vgId, VG_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->createdTime, VG_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->updateTime, VG_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->version, VG_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashBegin, VG_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, VG_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, VG_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, VG_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, VG_DECODE_OVER)
for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgid->dnodeId)
SDB_GET_INT32(pRaw, dataPos, &pVgid->dnodeId, VG_DECODE_OVER)
if (pVgroup->replica == 1) {
pVgid->role = TAOS_SYNC_STATE_LEADER;
}
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_VGROUP_RESERVE_SIZE)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_VGROUP_RESERVE_SIZE, VG_DECODE_OVER)
terrno = 0;
VG_DECODE_OVER:
if (terrno != 0) {
mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
tfree(pRow);
return NULL;
}
mTrace("vgId:%d, decode from raw:%p, row:%p", pVgroup->vgId, pRaw, pVgroup);
return pRow;
}
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
mTrace("vgId:%d, perform insert action", pVgroup->vgId);
mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
return 0;
}
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
mTrace("vgId:%d, perform delete action", pVgroup->vgId);
mTrace("vgId:%d, perform delete action, row:%p", pVgroup->vgId, pVgroup);
return 0;
}
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) {
mTrace("vgId:%d, perform update action", pOldVgroup->vgId);
mTrace("vgId:%d, perform update action, old_row:%p new_row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup);
pOldVgroup->updateTime = pNewVgroup->updateTime;
pOldVgroup->version = pNewVgroup->version;
pOldVgroup->hashBegin = pNewVgroup->hashBegin;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册