提交 98b35306 编写于 作者: M Minghao Li

sync timeout

上级 8d789bba
...@@ -31,11 +31,11 @@ extern "C" { ...@@ -31,11 +31,11 @@ extern "C" {
typedef struct SSyncIO { typedef struct SSyncIO {
STaosQueue *pMsgQ; STaosQueue *pMsgQ;
STaosQset * pQset; STaosQset *pQset;
pthread_t consumerTid; pthread_t consumerTid;
void * serverRpc; void *serverRpc;
void * clientRpc; void *clientRpc;
SEpSet myAddr; SEpSet myAddr;
void *ioTimerTickQ; void *ioTimerTickQ;
...@@ -49,6 +49,7 @@ typedef struct SSyncIO { ...@@ -49,6 +49,7 @@ typedef struct SSyncIO {
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg);
int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg);
int8_t isStart; int8_t isStart;
......
...@@ -71,6 +71,9 @@ extern int32_t sDebugFlag; ...@@ -71,6 +71,9 @@ extern int32_t sDebugFlag;
struct SRaft; struct SRaft;
typedef struct SRaft SRaft; typedef struct SRaft SRaft;
struct SyncTimeout;
typedef struct SyncTimeout SyncTimeout;
struct SyncPing; struct SyncPing;
typedef struct SyncPing SyncPing; typedef struct SyncPing SyncPing;
...@@ -174,6 +177,7 @@ typedef struct SSyncNode { ...@@ -174,6 +177,7 @@ typedef struct SSyncNode {
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
} SSyncNode; } SSyncNode;
......
...@@ -39,12 +39,26 @@ typedef enum ESyncMessageType { ...@@ -39,12 +39,26 @@ typedef enum ESyncMessageType {
SYNC_REQUEST_VOTE_REPLY = 111, SYNC_REQUEST_VOTE_REPLY = 111,
SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES = 113,
SYNC_APPEND_ENTRIES_REPLY = 115, SYNC_APPEND_ENTRIES_REPLY = 115,
SYNC_TIMEOUT = 117,
} ESyncMessageType; } ESyncMessageType;
// --------------------------------------------- // ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json(); cJSON* syncRpcUnknownMsg2Json();
// ---------------------------------------------
typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_PING = 0,
SYNC_TIMEOUT_ELECTION,
SYNC_TIMEOUT_HEARTBEAT,
} ESyncTimeoutType;
typedef struct SyncTimeout {
ESyncTimeoutType type;
void* data;
} SyncTimeout;
// --------------------------------------------- // ---------------------------------------------
typedef struct SyncPing { typedef struct SyncPing {
uint32_t bytes; uint32_t bytes;
......
...@@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -80,7 +80,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
pMsg->msgType, pMsg->contLen); pMsg->msgType, pMsg->contLen);
{ {
cJSON *pJson = syncRpcMsg2Json(pMsg); cJSON *pJson = syncRpcMsg2Json(pMsg);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
sTrace("process syncMessage send: pMsg:%s ", serialized); sTrace("process syncMessage send: pMsg:%s ", serialized);
free(serialized); free(serialized);
cJSON_Delete(pJson); cJSON_Delete(pJson);
...@@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -211,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
int type; int type;
qall = taosAllocateQall(); qall = taosAllocateQall();
...@@ -244,6 +244,8 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -244,6 +244,8 @@ static void *syncIOConsumerFunc(void *param) {
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
} else { } else {
; ;
} }
......
...@@ -37,6 +37,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); ...@@ -37,6 +37,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
...@@ -326,6 +327,11 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR ...@@ -326,6 +327,11 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR
return ret; return ret;
} }
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0;
return ret;
}
static void syncNodePingTimerCb(void* param, void* tmrId) { static void syncNodePingTimerCb(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param; SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerEnable)) { if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册