提交 b75da82c 编写于 作者: M Minghao Li

sync timeout

上级 98b35306
......@@ -31,11 +31,11 @@ extern "C" {
typedef struct SSyncIO {
STaosQueue *pMsgQ;
STaosQset *pQset;
STaosQset * pQset;
pthread_t consumerTid;
void *serverRpc;
void *clientRpc;
void * serverRpc;
void * clientRpc;
SEpSet myAddr;
void *ioTimerTickQ;
......
......@@ -30,7 +30,8 @@ extern "C" {
// encode as uint32
typedef enum ESyncMessageType {
SYNC_UNKNOWN = 99,
SYNC_UNKNOWN = 77,
SYNC_TIMEOUT = 99,
SYNC_PING = 101,
SYNC_PING_REPLY = 103,
SYNC_CLIENT_REQUEST = 105,
......@@ -39,7 +40,7 @@ typedef enum ESyncMessageType {
SYNC_REQUEST_VOTE_REPLY = 111,
SYNC_APPEND_ENTRIES = 113,
SYNC_APPEND_ENTRIES_REPLY = 115,
SYNC_TIMEOUT = 117,
} ESyncMessageType;
// ---------------------------------------------
......@@ -48,17 +49,28 @@ cJSON* syncRpcUnknownMsg2Json();
// ---------------------------------------------
typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_PING = 0,
SYNC_TIMEOUT_PING = 100,
SYNC_TIMEOUT_ELECTION,
SYNC_TIMEOUT_HEARTBEAT,
} ESyncTimeoutType;
typedef struct SyncTimeout {
ESyncTimeoutType type;
uint32_t bytes;
uint32_t msgType;
ESyncTimeoutType timeoutType;
void* data;
} SyncTimeout;
SyncTimeout* syncTimeoutBuild();
void syncTimeoutDestroy(SyncTimeout* pMsg);
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data);
// ---------------------------------------------
typedef struct SyncPing {
uint32_t bytes;
......
......@@ -246,7 +246,14 @@ static void *syncIOConsumerFunc(void *param) {
}
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
} else {
if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg;
pSyncMsg = syncTimeoutBuild();
syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
}
} else {
;
}
}
......
......@@ -27,6 +27,10 @@ static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNo
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static void syncNodePingTimerCb(void* param, void* tmrId);
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);
......@@ -95,7 +99,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = 1000;
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
pSyncNode->FpPingTimer = syncNodePingTimerCb;
pSyncNode->FpPingTimer = syncNodeEqPingTimer;
pSyncNode->pingTimerCounter = 0;
pSyncNode->FpOnPing = syncNodeOnPingCb;
......@@ -104,6 +108,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
return pSyncNode;
}
......@@ -329,6 +334,27 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0;
sTrace("<-- syncNodeOnTimeoutCb -->");
{
cJSON* pJson = syncTimeout2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_8(&ths->pingTimerEnable)) {
++(ths->pingTimerCounter);
syncNodePingAll(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
} else {
}
return ret;
}
......@@ -336,7 +362,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
++(pSyncNode->pingTimerCounter);
// pSyncNode->pingTimerMS += 100;
sTrace(
"syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
......@@ -350,4 +375,26 @@ static void syncNodePingTimerCb(void* param, void* tmrId) {
} else {
sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
}
}
\ No newline at end of file
}
static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
// pSyncNode->pingTimerMS += 100;
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg);
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pPingTimer);
} else {
sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
}
}
static void syncNodeEqElectTimer(void* param, void* tmrId) {}
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}
\ No newline at end of file
......@@ -24,7 +24,12 @@ void onMessage(SRaft* pRaft, void* pMsg) {}
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
if (pRpcMsg->msgType == SYNC_PING) {
// in compiler optimization, switch case = if else constants
if (pRpcMsg->msgType == SYNC_TIMEOUT) {
SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont;
pRoot = syncTimeout2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING) {
SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont;
pRoot = syncPing2Json(pSyncMsg);
......@@ -73,6 +78,66 @@ cJSON* syncRpcUnknownMsg2Json() {
return pJson;
}
// ---- message process SyncTimeout----
SyncTimeout* syncTimeoutBuild() {
uint32_t bytes = sizeof(SyncTimeout);
SyncTimeout* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_TIMEOUT;
return pMsg;
}
void syncTimeoutDestroy(SyncTimeout* pMsg) {
if (pMsg != NULL) {
free(pMsg);
}
}
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
}
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) {
syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
cJSON_AddStringToObject(pRoot, "data", u64buf);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot);
return pJson;
}
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) {
SyncTimeout* pMsg = syncTimeoutBuild();
pMsg->timeoutType = timeoutType;
pMsg->data = data;
return pMsg;
}
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_PING_FIX_LEN + dataLen;
......
......@@ -78,6 +78,7 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = doSync(myIndex);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册