未验证 提交 fd027bef 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13950 from taosdata/fix/dnode

enh: let rollback stage stage also need to be consistent
......@@ -62,6 +62,7 @@ typedef struct SSyncCfg {
typedef struct SFsmCbMeta {
SyncIndex index;
SyncIndex lastConfigIndex;
bool isWeak;
int32_t code;
ESyncState state;
......@@ -75,6 +76,7 @@ typedef struct SReConfigCbMeta {
int32_t code;
SyncIndex index;
SyncTerm term;
SyncIndex lastConfigIndex;
SyncTerm currentTerm;
SSyncCfg oldCfg;
SSyncCfg newCfg;
......
......@@ -441,108 +441,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004)
#define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005)
// http
#define TSDB_CODE_HTTP_SERVER_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x1100) //"http server is not online"
#define TSDB_CODE_HTTP_UNSUPPORT_URL TAOS_DEF_ERROR_CODE(0, 0x1101) //"url is not support"
#define TSDB_CODE_HTTP_INVALID_URL TAOS_DEF_ERROR_CODE(0, 0x1102) //invalid url format"
#define TSDB_CODE_HTTP_NO_ENOUGH_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1103) //"no enough memory"
#define TSDB_CODE_HTTP_REQUSET_TOO_BIG TAOS_DEF_ERROR_CODE(0, 0x1104) //"request size is too big"
#define TSDB_CODE_HTTP_NO_AUTH_INFO TAOS_DEF_ERROR_CODE(0, 0x1105) //"no auth info input"
#define TSDB_CODE_HTTP_NO_MSG_INPUT TAOS_DEF_ERROR_CODE(0, 0x1106) //"request is empty"
#define TSDB_CODE_HTTP_NO_SQL_INPUT TAOS_DEF_ERROR_CODE(0, 0x1107) //"no sql input"
#define TSDB_CODE_HTTP_NO_EXEC_USEDB TAOS_DEF_ERROR_CODE(0, 0x1108) //"no need to execute use db cmd"
#define TSDB_CODE_HTTP_SESSION_FULL TAOS_DEF_ERROR_CODE(0, 0x1109) //"session list was full"
#define TSDB_CODE_HTTP_GEN_TAOSD_TOKEN_ERR TAOS_DEF_ERROR_CODE(0, 0x110A) //"generate taosd token error"
#define TSDB_CODE_HTTP_INVALID_MULTI_REQUEST TAOS_DEF_ERROR_CODE(0, 0x110B) //"size of multi request is 0"
#define TSDB_CODE_HTTP_CREATE_GZIP_FAILED TAOS_DEF_ERROR_CODE(0, 0x110C) //"failed to create gzip"
#define TSDB_CODE_HTTP_FINISH_GZIP_FAILED TAOS_DEF_ERROR_CODE(0, 0x110D) //"failed to finish gzip"
#define TSDB_CODE_HTTP_LOGIN_FAILED TAOS_DEF_ERROR_CODE(0, 0x110E) //"failed to login"
#define TSDB_CODE_HTTP_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x1120) //"invalid http version"
#define TSDB_CODE_HTTP_INVALID_CONTENT_LENGTH TAOS_DEF_ERROR_CODE(0, 0x1121) //"invalid content length"
#define TSDB_CODE_HTTP_INVALID_AUTH_TYPE TAOS_DEF_ERROR_CODE(0, 0x1122) //"invalid type of Authorization"
#define TSDB_CODE_HTTP_INVALID_AUTH_FORMAT TAOS_DEF_ERROR_CODE(0, 0x1123) //"invalid format of Authorization"
#define TSDB_CODE_HTTP_INVALID_BASIC_AUTH TAOS_DEF_ERROR_CODE(0, 0x1124) //"invalid basic Authorization"
#define TSDB_CODE_HTTP_INVALID_TAOSD_AUTH TAOS_DEF_ERROR_CODE(0, 0x1125) //"invalid taosd Authorization"
#define TSDB_CODE_HTTP_PARSE_METHOD_FAILED TAOS_DEF_ERROR_CODE(0, 0x1126) //"failed to parse method"
#define TSDB_CODE_HTTP_PARSE_TARGET_FAILED TAOS_DEF_ERROR_CODE(0, 0x1127) //"failed to parse target"
#define TSDB_CODE_HTTP_PARSE_VERSION_FAILED TAOS_DEF_ERROR_CODE(0, 0x1128) //"failed to parse http version"
#define TSDB_CODE_HTTP_PARSE_SP_FAILED TAOS_DEF_ERROR_CODE(0, 0x1129) //"failed to parse sp"
#define TSDB_CODE_HTTP_PARSE_STATUS_FAILED TAOS_DEF_ERROR_CODE(0, 0x112A) //"failed to parse status"
#define TSDB_CODE_HTTP_PARSE_PHRASE_FAILED TAOS_DEF_ERROR_CODE(0, 0x112B) //"failed to parse phrase"
#define TSDB_CODE_HTTP_PARSE_CRLF_FAILED TAOS_DEF_ERROR_CODE(0, 0x112C) //"failed to parse crlf"
#define TSDB_CODE_HTTP_PARSE_HEADER_FAILED TAOS_DEF_ERROR_CODE(0, 0x112D) //"failed to parse header"
#define TSDB_CODE_HTTP_PARSE_HEADER_KEY_FAILED TAOS_DEF_ERROR_CODE(0, 0x112E) //"failed to parse header key"
#define TSDB_CODE_HTTP_PARSE_HEADER_VAL_FAILED TAOS_DEF_ERROR_CODE(0, 0x112F) //"failed to parse header val"
#define TSDB_CODE_HTTP_PARSE_CHUNK_SIZE_FAILED TAOS_DEF_ERROR_CODE(0, 0x1130) //"failed to parse chunk size"
#define TSDB_CODE_HTTP_PARSE_CHUNK_FAILED TAOS_DEF_ERROR_CODE(0, 0x1131) //"failed to parse chunk"
#define TSDB_CODE_HTTP_PARSE_END_FAILED TAOS_DEF_ERROR_CODE(0, 0x1132) //"failed to parse end section"
#define TSDB_CODE_HTTP_PARSE_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x1134) //"invalid parse state"
#define TSDB_CODE_HTTP_PARSE_ERROR_STATE TAOS_DEF_ERROR_CODE(0, 0x1135) //"failed to parse error section"
#define TSDB_CODE_HTTP_GC_QUERY_NULL TAOS_DEF_ERROR_CODE(0, 0x1150) //"query size is 0"
#define TSDB_CODE_HTTP_GC_QUERY_SIZE TAOS_DEF_ERROR_CODE(0, 0x1151) //"query size can not more than 100"
#define TSDB_CODE_HTTP_GC_REQ_PARSE_ERROR TAOS_DEF_ERROR_CODE(0, 0x1152) //"parse grafana json error"
#define TSDB_CODE_HTTP_TG_DB_NOT_INPUT TAOS_DEF_ERROR_CODE(0, 0x1160) //"database name can not be null"
#define TSDB_CODE_HTTP_TG_DB_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x1161) //"database name too long"
#define TSDB_CODE_HTTP_TG_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x1162) //"invalid telegraf json fromat"
#define TSDB_CODE_HTTP_TG_METRICS_NULL TAOS_DEF_ERROR_CODE(0, 0x1163) //"metrics size is 0"
#define TSDB_CODE_HTTP_TG_METRICS_SIZE TAOS_DEF_ERROR_CODE(0, 0x1164) //"metrics size can not more than 1K"
#define TSDB_CODE_HTTP_TG_METRIC_NULL TAOS_DEF_ERROR_CODE(0, 0x1165) //"metric name not find"
#define TSDB_CODE_HTTP_TG_METRIC_TYPE TAOS_DEF_ERROR_CODE(0, 0x1166) //"metric name type should be string"
#define TSDB_CODE_HTTP_TG_METRIC_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x1167) //"metric name length is 0"
#define TSDB_CODE_HTTP_TG_METRIC_NAME_LONG TAOS_DEF_ERROR_CODE(0, 0x1168) //"metric name length too long"
#define TSDB_CODE_HTTP_TG_TIMESTAMP_NULL TAOS_DEF_ERROR_CODE(0, 0x1169) //"timestamp not find"
#define TSDB_CODE_HTTP_TG_TIMESTAMP_TYPE TAOS_DEF_ERROR_CODE(0, 0x116A) //"timestamp type should be integer"
#define TSDB_CODE_HTTP_TG_TIMESTAMP_VAL_NULL TAOS_DEF_ERROR_CODE(0, 0x116B) //"timestamp value smaller than 0"
#define TSDB_CODE_HTTP_TG_TAGS_NULL TAOS_DEF_ERROR_CODE(0, 0x116C) //"tags not find"
#define TSDB_CODE_HTTP_TG_TAGS_SIZE_0 TAOS_DEF_ERROR_CODE(0, 0x116D) //"tags size is 0"
#define TSDB_CODE_HTTP_TG_TAGS_SIZE_LONG TAOS_DEF_ERROR_CODE(0, 0x116E) //"tags size too long"
#define TSDB_CODE_HTTP_TG_TAG_NULL TAOS_DEF_ERROR_CODE(0, 0x116F) //"tag is null"
#define TSDB_CODE_HTTP_TG_TAG_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x1170) //"tag name is null"
#define TSDB_CODE_HTTP_TG_TAG_NAME_SIZE TAOS_DEF_ERROR_CODE(0, 0x1171) //"tag name length too long"
#define TSDB_CODE_HTTP_TG_TAG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x1172) //"tag value type should be number or string"
#define TSDB_CODE_HTTP_TG_TAG_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x1173) //"tag value is null"
#define TSDB_CODE_HTTP_TG_TABLE_NULL TAOS_DEF_ERROR_CODE(0, 0x1174) //"table is null"
#define TSDB_CODE_HTTP_TG_TABLE_SIZE TAOS_DEF_ERROR_CODE(0, 0x1175) //"table name length too long"
#define TSDB_CODE_HTTP_TG_FIELDS_NULL TAOS_DEF_ERROR_CODE(0, 0x1176) //"fields not find"
#define TSDB_CODE_HTTP_TG_FIELDS_SIZE_0 TAOS_DEF_ERROR_CODE(0, 0x1177) //"fields size is 0"
#define TSDB_CODE_HTTP_TG_FIELDS_SIZE_LONG TAOS_DEF_ERROR_CODE(0, 0x1178) //"fields size too long"
#define TSDB_CODE_HTTP_TG_FIELD_NULL TAOS_DEF_ERROR_CODE(0, 0x1179) //"field is null"
#define TSDB_CODE_HTTP_TG_FIELD_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x117A) //"field name is null"
#define TSDB_CODE_HTTP_TG_FIELD_NAME_SIZE TAOS_DEF_ERROR_CODE(0, 0x117B) //"field name length too long"
#define TSDB_CODE_HTTP_TG_FIELD_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x117C) //"field value type should be number or string"
#define TSDB_CODE_HTTP_TG_FIELD_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x117D) //"field value is null"
#define TSDB_CODE_HTTP_TG_HOST_NOT_STRING TAOS_DEF_ERROR_CODE(0, 0x117E) //"host type should be string"
#define TSDB_CODE_HTTP_TG_STABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x117F) //"stable not exist"
#define TSDB_CODE_HTTP_OP_DB_NOT_INPUT TAOS_DEF_ERROR_CODE(0, 0x1190) //"database name can not be null"
#define TSDB_CODE_HTTP_OP_DB_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x1191) //"database name too long"
#define TSDB_CODE_HTTP_OP_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x1192) //"invalid opentsdb json fromat"
#define TSDB_CODE_HTTP_OP_METRICS_NULL TAOS_DEF_ERROR_CODE(0, 0x1193) //"metrics size is 0"
#define TSDB_CODE_HTTP_OP_METRICS_SIZE TAOS_DEF_ERROR_CODE(0, 0x1194) //"metrics size can not more than 10K"
#define TSDB_CODE_HTTP_OP_METRIC_NULL TAOS_DEF_ERROR_CODE(0, 0x1195) //"metric name not find"
#define TSDB_CODE_HTTP_OP_METRIC_TYPE TAOS_DEF_ERROR_CODE(0, 0x1196) //"metric name type should be string"
#define TSDB_CODE_HTTP_OP_METRIC_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x1197) //"metric name length is 0"
#define TSDB_CODE_HTTP_OP_METRIC_NAME_LONG TAOS_DEF_ERROR_CODE(0, 0x1198) //"metric name length can not more than 22"
#define TSDB_CODE_HTTP_OP_TIMESTAMP_NULL TAOS_DEF_ERROR_CODE(0, 0x1199) //"timestamp not find"
#define TSDB_CODE_HTTP_OP_TIMESTAMP_TYPE TAOS_DEF_ERROR_CODE(0, 0x119A) //"timestamp type should be integer"
#define TSDB_CODE_HTTP_OP_TIMESTAMP_VAL_NULL TAOS_DEF_ERROR_CODE(0, 0x119B) //"timestamp value smaller than 0"
#define TSDB_CODE_HTTP_OP_TAGS_NULL TAOS_DEF_ERROR_CODE(0, 0x119C) //"tags not find"
#define TSDB_CODE_HTTP_OP_TAGS_SIZE_0 TAOS_DEF_ERROR_CODE(0, 0x119D) //"tags size is 0"
#define TSDB_CODE_HTTP_OP_TAGS_SIZE_LONG TAOS_DEF_ERROR_CODE(0, 0x119E) //"tags size too long"
#define TSDB_CODE_HTTP_OP_TAG_NULL TAOS_DEF_ERROR_CODE(0, 0x119F) //"tag is null"
#define TSDB_CODE_HTTP_OP_TAG_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x11A0) //"tag name is null"
#define TSDB_CODE_HTTP_OP_TAG_NAME_SIZE TAOS_DEF_ERROR_CODE(0, 0x11A1) //"tag name length too long"
#define TSDB_CODE_HTTP_OP_TAG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A2) //"tag value type should be boolean number or string"
#define TSDB_CODE_HTTP_OP_TAG_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A3) //"tag value is null"
#define TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x11A4) //"tag value can not more than 64"
#define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find"
#define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string"
#define TSDB_CODE_HTTP_REQUEST_JSON_ERROR TAOS_DEF_ERROR_CODE(0, 0x1F00) //"http request json error"
// tfs
#define TSDB_CODE_FS_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x2200)
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201)
......
......@@ -46,13 +46,14 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = cbMeta.code;
mDebug("trans:%d, is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId,
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw);
mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p",
transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state),
pRaw);
if (pMgmt->errCode == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
}
if (pMgmt->transId == transId) {
......@@ -68,36 +69,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
mndReleaseTrans(pMnode, pTrans);
}
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
SSnapshotMeta sMeta = {0};
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
sdbWriteFile(pMnode->pSdb);
}
sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
}
}
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data;
pSnapshot->lastApplyIndex = sdbGetCommitIndex(pMnode->pSdb);
pSnapshot->lastApplyTerm = sdbGetCommitTerm(pMnode->pSdb);
pSnapshot->lastConfigIndex = sdbGetCurConfig(pMnode->pSdb);
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
return 0;
}
void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
SSnapshotMeta sMeta = {0};
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
if (!pMnode->deploy) {
mInfo("mnode sync restore finished, and will handle outstanding transactions");
mndTransPullup(pMnode);
......
......@@ -22,8 +22,8 @@
#include "mndSync.h"
#include "mndUser.h"
#define TRANS_VER_NUMBER 1
#define TRANS_ARRAY_SIZE 8
#define TRANS_VER_NUMBER 1
#define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 64
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
......@@ -52,7 +52,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
static bool mndCantExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsMaster(pMnode); }
static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsMaster(pMnode); }
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransReq(SRpcMsg *pReq);
......@@ -523,9 +523,10 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
}
if (pOld->stage == TRN_STAGE_ROLLBACK) {
pOld->stage = TRN_STAGE_FINISHED;
mTrace("trans:%d, stage from rollback to finished since perform update action", pNew->id);
pOld->stage = TRN_STAGE_REDO_ACTION;
mTrace("trans:%d, stage from rollback to undoAction since perform update action", pNew->id);
}
return 0;
}
......@@ -937,7 +938,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->msgSent) return 0;
if (mndCantExecuteTransAction(pMnode)) return -1;
if (mndCannotExecuteTransAction(pMnode)) return -1;
int64_t signature = pTrans->id;
signature = (signature << 32);
......@@ -1137,7 +1138,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
pTrans->lastEpset = pAction->epSet;
}
if (mndCantExecuteTransAction(pMnode)) break;
if (mndCannotExecuteTransAction(pMnode)) break;
if (code == 0) {
pTrans->code = 0;
......@@ -1180,7 +1181,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
code = mndTransExecuteRedoActions(pMnode, pTrans);
}
if (mndCantExecuteTransAction(pMnode)) return false;
if (mndCannotExecuteTransAction(pMnode)) return false;
if (code == 0) {
pTrans->code = 0;
......@@ -1193,8 +1194,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
} else {
pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->stage = TRN_STAGE_UNDO_ACTION;
mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr());
pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr());
continueExec = true;
} else {
pTrans->failedTimes++;
......@@ -1207,7 +1208,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
}
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
if (mndCantExecuteTransAction(pMnode)) return false;
if (mndCannotExecuteTransAction(pMnode)) return false;
bool continueExec = true;
int32_t code = mndTransCommit(pMnode, pTrans);
......@@ -1219,16 +1220,9 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
continueExec = true;
} else {
pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->stage = TRN_STAGE_UNDO_ACTION;
mError("trans:%d, stage from commit to undoAction since %s, failedTimes:%d", pTrans->id, terrstr(),
pTrans->failedTimes);
continueExec = true;
} else {
pTrans->failedTimes++;
mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
pTrans->failedTimes++;
mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
return continueExec;
......@@ -1257,11 +1251,9 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
if (mndCantExecuteTransAction(pMnode)) return false;
if (code == 0) {
pTrans->stage = TRN_STAGE_ROLLBACK;
mDebug("trans:%d, stage from undoAction to rollback", pTrans->id);
pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from undoAction to finished", pTrans->id);
continueExec = true;
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
......@@ -1276,14 +1268,14 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
}
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
if (mndCantExecuteTransAction(pMnode)) return false;
if (mndCannotExecuteTransAction(pMnode)) return false;
bool continueExec = true;
int32_t code = mndTransRollback(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from rollback to finished", pTrans->id);
pTrans->stage = TRN_STAGE_UNDO_ACTION;
mDebug("trans:%d, stage from rollback to undoAction", pTrans->id);
continueExec = true;
} else {
pTrans->failedTimes++;
......@@ -1331,12 +1323,12 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
case TRN_STAGE_COMMIT_ACTION:
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
break;
case TRN_STAGE_UNDO_ACTION:
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_ROLLBACK:
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
break;
case TRN_STAGE_UNDO_ACTION:
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_FINISHED:
continueExec = mndTransPerfromFinishedStage(pMnode, pTrans);
break;
......@@ -1438,13 +1430,8 @@ void mndTransPullup(SMnode *pMnode) {
mndReleaseTrans(pMnode, pTrans);
}
SSnapshotMeta sMeta = {0};
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
sdbWriteFile(pMnode->pSdb);
// todo, set to SDB_WRITE_DELTA
sdbWriteFile(pMnode->pSdb, 0);
taosArrayDestroy(pArray);
}
......
......@@ -493,8 +493,11 @@ TEST_F(MndTestSdb, 01_Write_Str) {
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2);
sdbSetApplyIndex(pSdb, -1);
ASSERT_EQ(sdbGetApplyIndex(pSdb), -1);
sdbSetApplyInfo(pSdb, -1, -1, -1);
// int64_t index, config;
// int64_t term;
// sdbGetCommitInfo(pSdb, &index, &term, &config);
// ASSERT_EQ(index, -1);
ASSERT_EQ(mnode.insertTimes, 2);
ASSERT_EQ(mnode.deleteTimes, 0);
......@@ -700,11 +703,12 @@ TEST_F(MndTestSdb, 01_Write_Str) {
}
// write version
sdbSetApplyIndex(pSdb, 0);
sdbSetApplyIndex(pSdb, 1);
ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
ASSERT_EQ(sdbWriteFile(pSdb), 0);
ASSERT_EQ(sdbWriteFile(pSdb), 0);
sdbSetApplyInfo(pSdb, 0, 0, 0);
sdbSetApplyInfo(pSdb, 1, 0, 0);
// sdbGetApplyInfo(pSdb, &index, &term, &config);
// ASSERT_EQ(index, 1);
ASSERT_EQ(sdbWriteFile(pSdb, 0), 0);
ASSERT_EQ(sdbWriteFile(pSdb, 0), 0);
sdbCleanup(pSdb);
ASSERT_EQ(mnode.insertTimes, 7);
......@@ -772,7 +776,11 @@ TEST_F(MndTestSdb, 01_Read_Str) {
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5);
ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
int64_t index, config;
int64_t term;
sdbGetCommitInfo(pSdb, &index, &term, &config);
ASSERT_EQ(index, 1);
ASSERT_EQ(mnode.insertTimes, 4);
ASSERT_EQ(mnode.deleteTimes, 0);
......
......@@ -37,6 +37,8 @@ extern "C" {
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on
#define SDB_WRITE_DELTA 100
#define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \
{ \
if (func(pRaw, dataPos, val) != 0) { \
......@@ -258,7 +260,7 @@ int32_t sdbReadFile(SSdb *pSdb);
* @param pSdb The sdb object.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t sdbWriteFile(SSdb *pSdb);
int32_t sdbWriteFile(SSdb *pSdb, int32_t delta);
/**
* @brief Parse and write raw data to sdb, then free the pRaw object
......@@ -362,14 +364,8 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
* @param index The update value of the apply index.
* @return int32_t The current index of sdb
*/
void sdbSetApplyIndex(SSdb *pSdb, int64_t index);
void sdbSetApplyTerm(SSdb *pSdb, int64_t term);
void sdbSetCurConfig(SSdb *pSdb, int64_t config);
int64_t sdbGetApplyIndex(SSdb *pSdb);
int64_t sdbGetApplyTerm(SSdb *pSdb);
int64_t sdbGetCommitIndex(SSdb *pSdb);
int64_t sdbGetCommitTerm(SSdb *pSdb);
int64_t sdbGetCurConfig(SSdb *pSdb);
void sdbSetApplyInfo(SSdb *pSdb, int64_t index, int64_t term, int64_t config);
void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config);
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
void sdbFreeRaw(SSdbRaw *pRaw);
......
......@@ -68,7 +68,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
void sdbCleanup(SSdb *pSdb) {
mDebug("start to cleanup sdb");
sdbWriteFile(pSdb);
sdbWriteFile(pSdb, 0);
if (pSdb->currDir != NULL) {
taosMemoryFreeClear(pSdb->currDir);
......@@ -160,23 +160,20 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return 0;
}
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->applyIndex = index; }
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->applyTerm = term; }
void sdbSetCurConfig(SSdb *pSdb, int64_t config) {
if (pSdb->applyConfig != config) {
mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->applyConfig, config);
pSdb->applyConfig = config;
}
void sdbSetApplyInfo(SSdb *pSdb, int64_t index, int64_t term, int64_t config) {
mTrace("mnode apply info changed, from index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", to index:%" PRId64
" term:%" PRId64 " config:%" PRId64,
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, index, term, config);
pSdb->applyIndex = index;
pSdb->applyTerm = term;
pSdb->applyConfig = config;
}
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->applyIndex; }
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->applyTerm; }
int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->commitIndex; }
int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->commitTerm; }
int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->commitConfig; }
\ No newline at end of file
void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config) {
*index = pSdb->commitIndex;
*term = pSdb->commitTerm;
*config = pSdb->commitConfig;
mTrace("mnode current info, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
" term:%" PRId64 " config:%" PRId64,
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config);
}
......@@ -445,12 +445,16 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
return code;
}
int32_t sdbWriteFile(SSdb *pSdb) {
int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
int32_t code = 0;
if (pSdb->applyIndex == pSdb->commitIndex) {
return 0;
}
if (pSdb->applyIndex - pSdb->commitIndex < delta) {
return 0;
}
taosThreadMutexLock(&pSdb->filelock);
if (pSdb->pWal != NULL) {
code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
......@@ -475,7 +479,7 @@ int32_t sdbDeploy(SSdb *pSdb) {
return -1;
}
if (sdbWriteFile(pSdb) != 0) {
if (sdbWriteFile(pSdb, 0) != 0) {
return -1;
}
......
......@@ -180,8 +180,8 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
vTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
pMsg->info.handle);
vTrace("vgId:%d, msg:%p get from vnode-apply queue, index:%" PRId64 " type:%s handle:%p", vgId, pMsg,
pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) {
......@@ -334,8 +334,8 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " index:%" PRId64 " handle:%p",
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, cbMeta.index, rpcMsg.info.handle);
if (rpcMsg.info.handle != NULL) {
tmsgSendRsp(&rpcMsg);
}
......
......@@ -171,7 +171,8 @@ void syncNodeClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak);
// option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
......
......@@ -47,14 +47,15 @@ typedef struct SRaftCfg {
SRaftCfg *raftCfgOpen(const char *path);
int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char * syncCfg2Str(SSyncCfg *pSyncCfg);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char * raftCfg2Str(SRaftCfg *pRaftCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
......
......@@ -208,8 +208,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pRollBackEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
......@@ -234,8 +235,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
......@@ -266,8 +268,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
......@@ -696,8 +699,9 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pRollBackEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
......@@ -725,8 +729,9 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
......
......@@ -444,6 +444,21 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
return 0;
}
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
}
}
sTrace("sync syncNodeGetSnapshotConfigIndex index:%ld lastConfigIndex:%ld", snapshotLastApplyIndex, lastIndex);
return lastIndex;
}
const char* syncGetMyRoleStr(int64_t rid) {
const char* s = syncUtilState2String(syncGetMyRole(rid));
return s;
......@@ -2068,8 +2083,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
......@@ -2090,8 +2106,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 1;
cbMeta.state = ths->state;
......@@ -2155,11 +2172,12 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
}
*/
if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = 0;
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.state = ths->state;
......@@ -2261,6 +2279,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index);
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
......@@ -2295,8 +2314,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// user commit
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
......@@ -2310,6 +2330,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// config change
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
raftCfgAddConfigIndex(ths->pRaftCfg, pEntry->index);
raftCfgPersist(ths->pRaftCfg);
code = syncNodeConfigChange(ths, &rpcMsg, pEntry);
ASSERT(code == 0);
}
......
......@@ -66,6 +66,13 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
return 0;
}
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex) {
ASSERT(pRaftCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT);
(pRaftCfg->configIndexArr)[pRaftCfg->configIndexCount] = configIndex;
++(pRaftCfg->configIndexCount);
return 0;
}
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
......
......@@ -50,6 +50,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
} else {
sError("snapshotSenderCreate cannot create sender");
}
return pSender;
}
......@@ -84,6 +85,10 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld",
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
/*
SSyncRaftEntry *pEntry = NULL;
......@@ -421,7 +426,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -542,7 +547,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON * pTmp = pFromId;
cJSON *pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
......@@ -566,7 +571,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -108,7 +108,7 @@ python3 ./test.py -f 2-query/distribute_agg_apercentile.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py
#python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
python3 ./test.py -f 7-tmq/basic5.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册