提交 0d7272a3 编写于 作者: M Minghao Li

fix(sync): sending snapshot

上级 84a67e85
...@@ -284,7 +284,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { ...@@ -284,7 +284,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -398,7 +398,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -398,7 +398,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
......
...@@ -24,6 +24,9 @@ SyncIndex gSnapshotLastApplyIndex; ...@@ -24,6 +24,9 @@ SyncIndex gSnapshotLastApplyIndex;
SyncIndex gSnapshotLastApplyTerm; SyncIndex gSnapshotLastApplyTerm;
int gIterTimes = 0; int gIterTimes = 0;
SyncIndex gFinishLastApplyIndex;
SyncIndex gFinishLastApplyTerm;
void init() { void init() {
int code = walInit(); int code = walInit();
assert(code == 0); assert(code == 0);
...@@ -111,22 +114,25 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32 ...@@ -111,22 +114,25 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) { int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) {
*ppWriter = (void*)0xCDEF; *ppWriter = (void*)0xCDEF;
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter); snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);
sTrace("%s", logBuf); sTrace("%s", logBuf);
return 0; return 0;
} }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter,
isApply);
sTrace("%s", logBuf);
if (isApply) { if (isApply) {
gSnapshotLastApplyIndex = 10; gSnapshotLastApplyIndex = gFinishLastApplyIndex;
gSnapshotLastApplyTerm = 1; gSnapshotLastApplyTerm = gFinishLastApplyTerm;
} }
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d, gSnapshotLastApplyIndex:%ld, "
"gSnapshotLastApplyTerm:%ld",
pFsm, pWriter, isApply, gSnapshotLastApplyIndex, gSnapshotLastApplyTerm);
sTrace("%s", logBuf);
return 0; return 0;
} }
...@@ -258,7 +264,7 @@ void usage(char* exe) { ...@@ -258,7 +264,7 @@ void usage(char* exe) {
printf( printf(
"usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) " "usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) "
"writeRecordNum(>=0) " "writeRecordNum(>=0) "
"isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) \n", "isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) \n",
exe); exe);
} }
...@@ -277,7 +283,7 @@ int main(int argc, char** argv) { ...@@ -277,7 +283,7 @@ int main(int argc, char** argv) {
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR; sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR;
if (argc != 10) { if (argc != 12) {
usage(argv[0]); usage(argv[0]);
exit(-1); exit(-1);
} }
...@@ -291,12 +297,14 @@ int main(int argc, char** argv) { ...@@ -291,12 +297,14 @@ int main(int argc, char** argv) {
bool isStandBy = atoi(argv[7]); bool isStandBy = atoi(argv[7]);
bool isConfigChange = atoi(argv[8]); bool isConfigChange = atoi(argv[8]);
int32_t iterTimes = atoi(argv[9]); int32_t iterTimes = atoi(argv[9]);
int32_t finishLastApplyIndex = atoi(argv[10]);
int32_t finishLastApplyTerm = atoi(argv[11]);
sTrace( sTrace(
"args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, " "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "
"isStandBy:%d, isConfigChange:%d, iterTimes:%d", "isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d",
replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange, replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange,
iterTimes); iterTimes, finishLastApplyIndex, finishLastApplyTerm);
// check parameter // check parameter
assert(replicaNum >= 1 && replicaNum <= 5); assert(replicaNum >= 1 && replicaNum <= 5);
...@@ -306,6 +314,8 @@ int main(int argc, char** argv) { ...@@ -306,6 +314,8 @@ int main(int argc, char** argv) {
assert(writeRecordNum >= 0); assert(writeRecordNum >= 0);
assert(isConfigChange >= 0 && isConfigChange <= 5); assert(isConfigChange >= 0 && isConfigChange <= 5);
assert(iterTimes >= 0); assert(iterTimes >= 0);
assert(finishLastApplyIndex >= -1);
assert(finishLastApplyTerm >= 0);
char logFile[256]; char logFile[256];
snprintf(logFile, sizeof(logFile), "/tmp/%s-replicaNum%d-myIndex%d.log", gDir, replicaNum, myIndex); snprintf(logFile, sizeof(logFile), "/tmp/%s-replicaNum%d-myIndex%d.log", gDir, replicaNum, myIndex);
...@@ -316,6 +326,9 @@ int main(int argc, char** argv) { ...@@ -316,6 +326,9 @@ int main(int argc, char** argv) {
gSnapshotLastApplyTerm = lastApplyTerm; gSnapshotLastApplyTerm = lastApplyTerm;
gIterTimes = iterTimes; gIterTimes = iterTimes;
gFinishLastApplyIndex = finishLastApplyIndex;
gFinishLastApplyTerm = finishLastApplyTerm;
init(); init();
int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]);
assert(ret == 0); assert(ret == 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册