提交 21191ae2 编写于 作者: M Minghao Li

enh(sync): add SSyncSnapshotReceiver

上级 6d8733a2
...@@ -56,20 +56,22 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); ...@@ -56,20 +56,22 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender); char *snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack;
void *pWriter; int32_t ack;
void *pCurrentBlock; void *pWriter;
int32_t len; void *pCurrentBlock;
int32_t blockLen;
SyncTerm term;
SSyncNode *pSyncNode; SSyncNode *pSyncNode;
int32_t replicaIndex; int32_t replicaIndex;
} SSyncSnapshotReceiver; } SSyncSnapshotReceiver;
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode); SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
......
...@@ -115,10 +115,6 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) { ...@@ -115,10 +115,6 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) {
taosMemoryFree(pSender->pCurrentBlock); taosMemoryFree(pSender->pCurrentBlock);
pSender->blockLen = 0; pSender->blockLen = 0;
} }
ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
&(pSender->pCurrentBlock), &(pSender->blockLen));
ASSERT(ret == 0);
} }
// send msg from seq, seq is already updated // send msg from seq, seq is already updated
...@@ -206,19 +202,102 @@ char *snapshotSender2Str(SSyncSnapshotSender *pSender) { ...@@ -206,19 +202,102 @@ char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
} }
// ------------------------------------- // -------------------------------------
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode) { return NULL; } SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
ASSERT(pSyncNode->pFsm->FpSnapshotStartWrite != NULL);
ASSERT(pSyncNode->pFsm->FpSnapshotStopWrite != NULL);
ASSERT(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
SSyncSnapshotReceiver *pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
ASSERT(pReceiver != NULL);
memset(pReceiver, 0, sizeof(*pReceiver));
pReceiver->start = false;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->pWriter = NULL;
pReceiver->pCurrentBlock = NULL;
pReceiver->blockLen = 0;
pReceiver->pSyncNode = pSyncNode;
pReceiver->replicaIndex = replicaIndex;
pReceiver->term = pSyncNode->pRaftStore->currentTerm;
return pReceiver;
}
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver != NULL) {
taosMemoryFree(pReceiver);
}
}
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) {
if (!(pReceiver->start)) {
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
ASSERT(ret == 0);
if (pReceiver->pCurrentBlock != NULL) {
taosMemoryFree(pReceiver->pCurrentBlock);
pReceiver->pCurrentBlock = NULL;
pReceiver->blockLen = 0;
}
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
} else {
ASSERT(0);
}
}
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
ASSERT(ret == 0);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {} if (pReceiver->pCurrentBlock != NULL) {
taosMemoryFree(pReceiver->pCurrentBlock);
pReceiver->blockLen = 0;
}
}
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) {} cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char u64buf[128];
cJSON *pRoot = cJSON_CreateObject();
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {} if (pReceiver != NULL) {
cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; } snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
cJSON_AddStringToObject(pRoot, "pWriter", u64buf);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { return NULL; } snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pCurrentBlock);
cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
cJSON_AddNumberToObject(pRoot, "blockLen", pReceiver->blockLen);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return NULL; } if (pReceiver->pCurrentBlock != NULL) {
char *s;
s = syncUtilprintBin((char *)(pReceiver->pCurrentBlock), pReceiver->blockLen);
cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
taosMemoryFree(s);
s = syncUtilprintBin2((char *)(pReceiver->pCurrentBlock), pReceiver->blockLen);
cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
taosMemoryFree(s);
}
snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
return pJson;
}
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
SSyncSnapshotReceiver *pReceiver = NULL; SSyncSnapshotReceiver *pReceiver = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册