提交 6eecbcd1 编写于 作者: M Minghao Li

refactor(sync): optimized one replica

上级 a6f33ba0
......@@ -26,9 +26,9 @@ extern "C" {
extern bool gRaftDetailLog;
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId;
......
......@@ -155,7 +155,6 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
code = 0;
}
}
}
......@@ -183,10 +182,12 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet);
} else {
if (terrno != 0) code = terrno;
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
if (code != 1) {
if (terrno != 0) code = terrno;
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
}
}
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
......
......@@ -28,13 +28,13 @@ extern "C" {
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 900
#define HEARTBEAT_TIMER_MS 900
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
......
......@@ -50,7 +50,7 @@ typedef struct SSyncIO {
void *pSyncNode;
int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg);
int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg);
int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg, SyncIndex* pRetIndex);
int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg, SyncIndex *pRetIndex);
int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg);
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
......
......@@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char * syncCfg2Str(SSyncCfg *pSyncCfg);
char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg);
cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char * raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
......
......@@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static void *syncIOConsumerFunc(void *param);
static void * syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
......@@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param;
SSyncIO * io = param;
STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg;
SRpcMsg * pRpcMsg, rpcMsg;
qall = taosAllocateQall();
while (1) {
......
......@@ -126,7 +126,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -2654,25 +2654,33 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
syncEntry2OriginalRpc(pEntry, &rpcMsg);
// user commit
bool internalExecute = (ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType);
if (ths->replicaNum == 1) {
internalExecute = syncNodeIsOptimizedOneReplica(ths, &rpcMsg) && !(ths->restoreFinish);
}
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
bool internalExecute = true;
if ((ths->replicaNum == 1) && ths->restoreFinish && (ths->vgId != 1)) {
internalExecute = false;
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "index:%ld, internalExecute:%d", i, internalExecute);
syncNodeEventLog(ths, logBuf);
} while (0);
// execute fsm in apply thread, or execute outside syncPropose
if (internalExecute) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = flag;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
// execute fsm in apply thread, or execute outside syncPropose
if (internalExecute) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = flag;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
// config change
......
......@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) {
int32_t len = 512;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
......@@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -271,7 +271,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);
......
......@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char *raftStore2Str(SRaftStore *pRaftStore) {
cJSON *pJson = raftStore2Json(pRaftStore);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -314,14 +314,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[128];
......@@ -461,7 +461,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
cJSON * pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
......@@ -494,14 +494,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId;
char host[128];
......
......@@ -127,7 +127,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char *voteGranted2Str(SVotesGranted *pVotesGranted) {
cJSON *pJson = voteGranted2Json(pVotesGranted);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -256,7 +256,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char *votesRespond2Str(SVotesRespond *pVotesRespond) {
cJSON *pJson = votesRespond2Json(pVotesRespond);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -113,7 +113,7 @@ void test2() {
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
// pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
......@@ -229,7 +229,7 @@ void test4() {
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
// pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) {
......@@ -291,7 +291,7 @@ void test5() {
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
// pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) {
......@@ -412,26 +412,23 @@ void test6() {
do {
SyncIndex firstVer = walGetFirstVer(pWal);
SyncIndex lastVer = walGetLastVer(pWal);
bool isEmpty = walIsEmpty(pWal);
bool isEmpty = walIsEmpty(pWal);
printf("before -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty);
} while (0);
logStoreDestory(pLogStore);
cleanup();
// restart
init();
pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
do {
SyncIndex firstVer = walGetFirstVer(pWal);
SyncIndex lastVer = walGetLastVer(pWal);
bool isEmpty = walIsEmpty(pWal);
bool isEmpty = walIsEmpty(pWal);
printf("after -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty);
} while (0);
......@@ -461,13 +458,13 @@ int main(int argc, char** argv) {
}
sTrace("gAssert : %d", gAssert);
/*
test1();
test2();
test3();
test4();
test5();
*/
/*
test1();
test2();
test3();
test4();
test5();
*/
test6();
return 0;
......
......@@ -312,7 +312,7 @@ void test5() {
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
//pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
// pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, 5);
for (int i = 6; i <= 10; ++i) {
int32_t dataLen = 10;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册