提交 73675bc5 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/vnode

...@@ -305,6 +305,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -305,6 +305,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
} else { } else {
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
pRequest->code = terrno;
return pRequest;
} }
_return: _return:
......
...@@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); ...@@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -177,7 +177,7 @@ static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { ...@@ -177,7 +177,7 @@ static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) {
} }
static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) { static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) {
mTrace("acct:%s, perform update action, old_row:%p new_row:%p", pOld->acct, pOld, pNew); mTrace("acct:%s, perform update action, old row:%p new row:%p", pOld->acct, pOld, pNew);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
pOld->status = pNew->status; pOld->status = pNew->status;
......
...@@ -155,7 +155,7 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) { ...@@ -155,7 +155,7 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) {
} }
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) { static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) {
mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); mTrace("bnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
return 0; return 0;
} }
......
...@@ -135,7 +135,7 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { ...@@ -135,7 +135,7 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
} }
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) { static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) {
mTrace("cluster:%" PRId64 ", perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
return 0; return 0;
} }
......
...@@ -28,7 +28,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb); ...@@ -28,7 +28,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq); static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq);
static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq); static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq);
static int32_t mndProcessDropDbReq(SMnodeMsg *pReq); static int32_t mndProcessDropDbReq(SMnodeMsg *pReq);
...@@ -182,12 +182,12 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { ...@@ -182,12 +182,12 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
return 0; return 0;
} }
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
mTrace("db:%s, perform update action, old_row:%p new_row:%p", pOldDb->name, pOldDb, pNewDb); mTrace("db:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
pOldDb->updateTime = pNewDb->updateTime; pOld->updateTime = pNew->updateTime;
pOldDb->cfgVersion = pNewDb->cfgVersion; pOld->cfgVersion = pNew->cfgVersion;
pOldDb->vgVersion = pNewDb->vgVersion; pOld->vgVersion = pNew->vgVersion;
memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg)); memcpy(&pOld->cfg, &pNew->cfg, sizeof(SDbCfg));
return 0; return 0;
} }
...@@ -331,14 +331,15 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -331,14 +331,15 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SCreateVnodeReq *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); SCreateVnodeReq *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pMsg; action.pCont = pReq;
action.contLen = sizeof(SCreateVnodeReq); action.contLen = sizeof(SCreateVnodeReq);
action.msgType = TDMT_DND_CREATE_VNODE; action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pReq);
return -1; return -1;
} }
} }
...@@ -360,14 +361,15 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -360,14 +361,15 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pMsg; action.pCont = pReq;
action.contLen = sizeof(SDropVnodeReq); action.contLen = sizeof(SDropVnodeReq);
action.msgType = TDMT_DND_DROP_VNODE; action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) { if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg); free(pReq);
return -1; return -1;
} }
} }
...@@ -376,7 +378,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -376,7 +378,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0; return 0;
} }
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) { static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbMsg *pCreate, SUserObj *pUser) {
SDbObj dbObj = {0}; SDbObj dbObj = {0};
memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN); memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN);
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN); memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
...@@ -425,43 +427,17 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat ...@@ -425,43 +427,17 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
} }
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) goto CREATE_DB_OVER;
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
goto CREATE_DB_OVER;
}
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
goto CREATE_DB_OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
} if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_DB_OVER;
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto CREATE_DB_OVER;
}
code = 0; code = 0;
...@@ -471,9 +447,9 @@ CREATE_DB_OVER: ...@@ -471,9 +447,9 @@ CREATE_DB_OVER:
return code; return code;
} }
static int32_t mndProcessCreateDbReq(SMnodeMsg *pMsg) { static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont; SCreateDbMsg *pCreate = pReq->rpcMsg.pCont;
pCreate->numOfVgroups = htonl(pCreate->numOfVgroups); pCreate->numOfVgroups = htonl(pCreate->numOfVgroups);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
...@@ -502,13 +478,13 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pMsg) { ...@@ -502,13 +478,13 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pMsg) {
} }
} }
SUserObj *pOperUser = mndAcquireUser(pMnode, pMsg->user); SUserObj *pOperUser = mndAcquireUser(pMnode, pReq->user);
if (pOperUser == NULL) { if (pOperUser == NULL) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1; return -1;
} }
int32_t code = mndCreateDb(pMnode, pMsg, pCreate, pOperUser); int32_t code = mndCreateDb(pMnode, pReq, pCreate, pOperUser);
mndReleaseUser(pMnode, pOperUser); mndReleaseUser(pMnode, pOperUser);
if (code != 0) { if (code != 0) {
...@@ -565,8 +541,8 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) { ...@@ -565,8 +541,8 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
return terrno; return terrno;
} }
static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pOldDb); SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
if (pRedoRaw == NULL) return -1; if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1;
...@@ -574,8 +550,8 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO ...@@ -574,8 +550,8 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
return 0; return 0;
} }
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb); SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
...@@ -593,14 +569,14 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -593,14 +569,14 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SAlterVnodeReq *pMsg = (SAlterVnodeReq *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); SAlterVnodeReq *pReq = (SAlterVnodeReq *)mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pMsg; action.pCont = pReq;
action.contLen = sizeof(SAlterVnodeReq); action.contLen = sizeof(SAlterVnodeReq);
action.msgType = TDMT_DND_ALTER_VNODE; action.msgType = TDMT_DND_ALTER_VNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pReq);
return -1; return -1;
} }
} }
...@@ -608,7 +584,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -608,7 +584,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return 0; return 0;
} }
static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
...@@ -617,8 +593,8 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -617,8 +593,8 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid == pNewDb->uid) { if (pVgroup->dbUid == pNew->uid) {
if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNewDb, pVgroup) != 0) { if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
return -1; return -1;
...@@ -631,27 +607,27 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -631,27 +607,27 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0; return 0;
} }
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); mError("db:%s, failed to update since %s", pOld->name, terrstr());
return terrno; return terrno;
} }
mDebug("trans:%d, used to update db:%s", pTrans->id, pOldDb->name); mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name);
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER; goto UPDATE_DB_OVER;
} }
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER; goto UPDATE_DB_OVER;
} }
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) { if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto UPDATE_DB_OVER; goto UPDATE_DB_OVER;
} }
...@@ -668,9 +644,9 @@ UPDATE_DB_OVER: ...@@ -668,9 +644,9 @@ UPDATE_DB_OVER:
return code; return code;
} }
static int32_t mndProcessAlterDbReq(SMnodeMsg *pMsg) { static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SAlterDbMsg *pAlter = pMsg->rpcMsg.pCont; SAlterDbMsg *pAlter = pReq->rpcMsg.pCont;
pAlter->totalBlocks = htonl(pAlter->totalBlocks); pAlter->totalBlocks = htonl(pAlter->totalBlocks);
pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0); pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0);
pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1); pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1);
...@@ -697,7 +673,7 @@ static int32_t mndProcessAlterDbReq(SMnodeMsg *pMsg) { ...@@ -697,7 +673,7 @@ static int32_t mndProcessAlterDbReq(SMnodeMsg *pMsg) {
dbObj.cfgVersion++; dbObj.cfgVersion++;
dbObj.updateTime = taosGetTimestampMs(); dbObj.updateTime = taosGetTimestampMs();
code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj); code = mndUpdateDb(pMnode, pReq, pDb, &dbObj);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
if (code != 0) { if (code != 0) {
...@@ -757,14 +733,15 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -757,14 +733,15 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pMsg; action.pCont = pReq;
action.contLen = sizeof(SCreateVnodeReq); action.contLen = sizeof(SCreateVnodeReq);
action.msgType = TDMT_DND_DROP_VNODE; action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pReq);
return -1; return -1;
} }
} }
...@@ -795,35 +772,17 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p ...@@ -795,35 +772,17 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0; return 0;
} }
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) goto DROP_DB_OVER;
mError("db:%s, failed to drop since %s", pDb->name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) { if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
goto DROP_DB_OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
} if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto DROP_DB_OVER;
}
code = 0; code = 0;
...@@ -832,9 +791,9 @@ DROP_DB_OVER: ...@@ -832,9 +791,9 @@ DROP_DB_OVER:
return code; return code;
} }
static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) { static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SDropDbMsg *pDrop = pMsg->rpcMsg.pCont; SDropDbMsg *pDrop = pReq->rpcMsg.pCont;
mDebug("db:%s, start to drop", pDrop->db); mDebug("db:%s, start to drop", pDrop->db);
...@@ -850,7 +809,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) { ...@@ -850,7 +809,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) {
} }
} }
int32_t code = mndDropDb(pMnode, pMsg, pDb); int32_t code = mndDropDb(pMnode, pReq, pDb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
if (code != 0) { if (code != 0) {
...@@ -861,16 +820,16 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) { ...@@ -861,16 +820,16 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessUseDbReq(SMnodeMsg *pMsg) { static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SUseDbMsg *pUse = pMsg->rpcMsg.pCont; SUseDbMsg *pUse = pReq->rpcMsg.pCont;
pUse->vgVersion = htonl(pUse->vgVersion); pUse->vgVersion = htonl(pUse->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db); SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mError("db:%s, failed to process use db msg since %s", pUse->db, terrstr()); mError("db:%s, failed to process use db req since %s", pUse->db, terrstr());
return -1; return -1;
} }
...@@ -922,19 +881,19 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pMsg) { ...@@ -922,19 +881,19 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pMsg) {
pRsp->vgNum = htonl(vindex); pRsp->vgNum = htonl(vindex);
pRsp->hashMethod = pDb->hashMethod; pRsp->hashMethod = pDb->hashMethod;
pMsg->pCont = pRsp; pReq->pCont = pRsp;
pMsg->contLen = contLen; pReq->contLen = contLen;
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
return 0; return 0;
} }
static int32_t mndProcessSyncDbReq(SMnodeMsg *pMsg) { static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSyncDbMsg *pSync = pMsg->rpcMsg.pCont; SSyncDbMsg *pSync = pReq->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pSync->db); SDbObj *pDb = mndAcquireDb(pMnode, pSync->db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to process sync db msg since %s", pSync->db, terrstr()); mError("db:%s, failed to process sync db req since %s", pSync->db, terrstr());
return -1; return -1;
} }
...@@ -942,12 +901,12 @@ static int32_t mndProcessSyncDbReq(SMnodeMsg *pMsg) { ...@@ -942,12 +901,12 @@ static int32_t mndProcessSyncDbReq(SMnodeMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndProcessCompactDbReq(SMnodeMsg *pMsg) { static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont; SCompactDbMsg *pCompact = pReq->rpcMsg.pCont;
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db); SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db);
if (pDb == NULL) { if (pDb == NULL) {
mError("db:%s, failed to process compact db msg since %s", pCompact->db, terrstr()); mError("db:%s, failed to process compact db req since %s", pCompact->db, terrstr());
return -1; return -1;
} }
...@@ -955,8 +914,8 @@ static int32_t mndProcessCompactDbReq(SMnodeMsg *pMsg) { ...@@ -955,8 +914,8 @@ static int32_t mndProcessCompactDbReq(SMnodeMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0; int32_t cols = 0;
......
...@@ -183,7 +183,7 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { ...@@ -183,7 +183,7 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
} }
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) { static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
return 0; return 0;
} }
......
...@@ -152,7 +152,7 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { ...@@ -152,7 +152,7 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
} }
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) { static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) {
mTrace("func:%s, perform update action, old_row:%p new_row:%p", pOldFunc->name, pOldFunc, pNewFunc); mTrace("func:%s, perform update action, old row:%p new row:%p", pOldFunc->name, pOldFunc, pNewFunc);
return 0; return 0;
} }
......
...@@ -208,7 +208,7 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { ...@@ -208,7 +208,7 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) {
} }
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) { static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) {
mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
return 0; return 0;
} }
......
...@@ -155,7 +155,7 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) { ...@@ -155,7 +155,7 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
} }
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) { static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) {
mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); mTrace("qnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
return 0; return 0;
} }
......
...@@ -155,7 +155,7 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) { ...@@ -155,7 +155,7 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
} }
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) { static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) {
mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); mTrace("snode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
return 0; return 0;
} }
......
...@@ -178,7 +178,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { ...@@ -178,7 +178,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
} }
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) { static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) {
mTrace("stb:%s, perform update action, old_row:%p new_row:%p", pOldStb->name, pOldStb, pNewStb); mTrace("stb:%s, perform update action, old row:%p new row:%p", pOldStb->name, pOldStb, pNewStb);
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime); atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
atomic_exchange_32(&pOldStb->version, pNewStb->version); atomic_exchange_32(&pOldStb->version, pNewStb->version);
......
...@@ -192,7 +192,7 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { ...@@ -192,7 +192,7 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
} }
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
mTrace("user:%s, perform update action, old_row:%p new_row:%p", pOld->user, pOld, pNew); mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew);
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
return 0; return 0;
......
...@@ -165,7 +165,7 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) { ...@@ -165,7 +165,7 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
} }
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) { static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) {
mTrace("vgId:%d, perform update action, old_row:%p new_row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup); mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup);
pOldVgroup->updateTime = pNewVgroup->updateTime; pOldVgroup->updateTime = pNewVgroup->updateTime;
pOldVgroup->version = pNewVgroup->version; pOldVgroup->version = pNewVgroup->version;
pOldVgroup->hashBegin = pNewVgroup->hashBegin; pOldVgroup->hashBegin = pNewVgroup->hashBegin;
...@@ -189,7 +189,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { ...@@ -189,7 +189,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq)); SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq));
if (pCreate == NULL) { if (pCreate == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -248,7 +248,7 @@ SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb ...@@ -248,7 +248,7 @@ SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
return pCreate; return pCreate;
} }
SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq)); SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq));
if (pDrop == NULL) { if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -112,6 +112,9 @@ public: ...@@ -112,6 +112,9 @@ public:
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
// todo // todo
vgInfo->vgId = 1; vgInfo->vgId = 1;
vgInfo->numOfEps = 1;
vgInfo->epAddr[0].port = 6030;
strcpy(vgInfo->epAddr[0].fqdn, "node1");
return 0; return 0;
} }
......
...@@ -234,7 +234,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { ...@@ -234,7 +234,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) { static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
execNode->nodeId = vg->vgId; execNode->nodeId = vg->vgId;
execNode->inUse = 0; // todo execNode->inUse = vg->inUse;
execNode->numOfEps = vg->numOfEps; execNode->numOfEps = vg->numOfEps;
for (int8_t i = 0; i < vg->numOfEps; ++i) { for (int8_t i = 0; i < vg->numOfEps; ++i) {
execNode->epAddr[i] = vg->epAddr[i]; execNode->epAddr[i] = vg->epAddr[i];
......
...@@ -62,7 +62,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f ...@@ -62,7 +62,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
return func(jObj, *obj); return func(jObj, *obj);
} }
static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* array) { static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array); size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
if (size > 0) { if (size > 0) {
cJSON* jArray = cJSON_AddArrayToObject(json, name); cJSON* jArray = cJSON_AddArrayToObject(json, name);
...@@ -70,7 +70,7 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* ...@@ -70,7 +70,7 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray*
return false; return false;
} }
for (size_t i = 0; i < size; ++i) { for (size_t i = 0; i < size; ++i) {
if (!addItem(jArray, func, taosArrayGetP(array, i))) { if (!addItem(jArray, func, isPoint ? taosArrayGetP(array, i) : taosArrayGet(array, i))) {
return false; return false;
} }
} }
...@@ -78,11 +78,19 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* ...@@ -78,11 +78,19 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray*
return true; return true;
} }
static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) { static bool addInlineArray(cJSON* json, const char* name, FToJson func, const SArray* array) {
return addTarray(json, name, func, array, false);
}
static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* array) {
return addTarray(json, name, func, array, true);
}
static bool fromTarray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize, bool isPoint) {
const cJSON* jArray = cJSON_GetObjectItem(json, name); const cJSON* jArray = cJSON_GetObjectItem(json, name);
int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray)); int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray));
if (size > 0) { if (size > 0) {
*array = taosArrayInit(size, POINTER_BYTES); *array = taosArrayInit(size, isPoint ? POINTER_BYTES : itemSize);
if (NULL == *array) { if (NULL == *array) {
return false; return false;
} }
...@@ -92,11 +100,19 @@ static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArra ...@@ -92,11 +100,19 @@ static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArra
if (NULL == item || !func(cJSON_GetArrayItem(jArray, i), item)) { if (NULL == item || !func(cJSON_GetArrayItem(jArray, i), item)) {
return false; return false;
} }
taosArrayPush(*array, &item); taosArrayPush(*array, isPoint ? &item : item);
} }
return true; return true;
} }
static bool fromInlineArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) {
return fromTarray(json, name, func, array, itemSize, false);
}
static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) {
return fromTarray(json, name, func, array, itemSize, true);
}
static bool addRawArray(cJSON* json, const char* name, FToJson func, const void* array, int32_t itemSize, int32_t size) { static bool addRawArray(cJSON* json, const char* name, FToJson func, const void* array, int32_t itemSize, int32_t size) {
if (size > 0) { if (size > 0) {
cJSON* jArray = cJSON_AddArrayToObject(json, name); cJSON* jArray = cJSON_AddArrayToObject(json, name);
...@@ -556,6 +572,32 @@ static bool epAddrFromJson(const cJSON* json, void* obj) { ...@@ -556,6 +572,32 @@ static bool epAddrFromJson(const cJSON* json, void* obj) {
return true; return true;
} }
static const char* jkNodeAddrId = "NodeId";
static const char* jkNodeAddrInUse = "InUse";
static const char* jkNodeAddrEpAddrs = "EpAddrs";
static bool nodeAddrToJson(const void* obj, cJSON* json) {
const SQueryNodeAddr* ep = (const SQueryNodeAddr*)obj;
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, ep->nodeId);
if (res) {
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse);
}
if (res) {
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddrMsg));
}
return res;
}
static bool nodeAddrFromJson(const cJSON* json, void* obj) {
SQueryNodeAddr* ep = (SQueryNodeAddr*)obj;
ep->nodeId = getNumber(json, jkNodeAddrId);
ep->inUse = getNumber(json, jkNodeAddrInUse);
int32_t numOfEps = 0;
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddrMsg), &numOfEps);
ep->numOfEps = numOfEps;
return res;
}
static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId"; static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId";
static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints"; static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints";
...@@ -563,7 +605,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { ...@@ -563,7 +605,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj; const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId); bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId);
if (res) { if (res) {
res = addArray(json, jkExchangeNodeSrcEndPoints, epAddrToJson, exchange->pSrcEndPoints); res = addInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints);
} }
return res; return res;
} }
...@@ -571,7 +613,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { ...@@ -571,7 +613,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
static bool exchangeNodeFromJson(const cJSON* json, void* obj) { static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
SExchangePhyNode* exchange = (SExchangePhyNode*)obj; SExchangePhyNode* exchange = (SExchangePhyNode*)obj;
exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId); exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId);
return fromArray(json, jkExchangeNodeSrcEndPoints, epAddrFromJson, &exchange->pSrcEndPoints, sizeof(SEpAddrMsg)); return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SQueryNodeAddr));
} }
static bool specificPhyNodeToJson(const void* obj, cJSON* json) { static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
...@@ -803,7 +845,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) { ...@@ -803,7 +845,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
if (res) { if (res) {
res = addObject(jSubplan, jkSubplanDataSink, dataSinkToJson, subplan->pDataSink); res = addObject(jSubplan, jkSubplanDataSink, dataSinkToJson, subplan->pDataSink);
} }
if (!res) { if (!res) {
cJSON_Delete(jSubplan); cJSON_Delete(jSubplan);
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册