syncUtil.c 22.6 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncUtil.h"
18
#include "syncIndexMgr.h"
S
Shengliang Guan 已提交
19
#include "syncMessage.h"
20
#include "syncPipeline.h"
S
Shengliang Guan 已提交
21 22 23
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
D
dmchen 已提交
24
#include "tglobal.h"
M
Minghao Li 已提交
25

26
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
27
  int32_t len = snprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
28
  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
29
    len += snprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
30
    if (i < pCfg->replicaNum - 1) {
31
      len += snprintf(buf + len, bufLen - len, "%s", ", ");
32
    }
33
  }
34
  len += snprintf(buf + len, bufLen - len, "%s", "]}");
M
Minghao Li 已提交
35 36
}

37
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
M
Minghao Li 已提交
38
  pEpSet->inUse = 0;
39 40 41
  pEpSet->numOfEps = 1;
  pEpSet->eps[0].port = pInfo->nodePort;
  tstrncpy(pEpSet->eps[0].fqdn, pInfo->nodeFqdn, TSDB_FQDN_LEN);
M
Minghao Li 已提交
42 43
}

44
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
D
dmchen 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
  uint32_t ipv4 = 0xFFFFFFFF;
  sDebug("vgId:%d, start to resolve sync addr fqdn in %d seconds, "
        "dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ", 
        vgId, tsResolveFQDNRetryTime,
        pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort);
  for(int i = 0; i < tsResolveFQDNRetryTime; i++){
    ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn);
    if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
      sError("failed to resolve ipv4 addr, fqdn:%s, wait one second", pInfo->nodeFqdn);
      taosSsleep(1);
    }
    else{
      break;
    }
  }
  
61
  if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
S
Shengliang Guan 已提交
62
    sError("failed to resolve ipv4 addr, fqdn:%s", pInfo->nodeFqdn);
63 64 65
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return false;
  }
S
Shengliang Guan 已提交
66

67
  char ipbuf[128] = {0};
M
sync io  
Minghao Li 已提交
68
  tinet_ntoa(ipbuf, ipv4);
69
  raftId->addr = SYNC_ADDR(pInfo);
M
Minghao Li 已提交
70
  raftId->vgId = vgId;
71 72 73

  sInfo("vgId:%d, sync addr:%" PRIu64 ", dnode:%d cluster:%" PRId64 " fqdn:%s ip:%s port:%u ipv4:%u", vgId,
        raftId->addr, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, ipbuf, pInfo->nodePort, ipv4);
74
  return true;
M
Minghao Li 已提交
75 76
}

M
Minghao Li 已提交
77
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
78 79 80
  if (pId1->addr == pId2->addr && pId1->vgId == pId2->vgId) return true;
  if ((CID(pId1) == 0 || CID(pId2) == 0) && (DID(pId1) == DID(pId2)) && pId1->vgId == pId2->vgId) return true;
  return false;
M
Minghao Li 已提交
81
}
M
Minghao Li 已提交
82 83

bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); }
M
Minghao Li 已提交
84

S
Shengliang Guan 已提交
85
static inline int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
88 89 90 91
  int32_t rdm = min + syncUtilRand(max - min);

  // sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
  return rdm;
M
Minghao Li 已提交
92
}
M
Minghao Li 已提交
93

M
Minghao Li 已提交
94 95
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }

M
Minghao Li 已提交
96 97 98 99 100 101
void syncUtilMsgHtoN(void* msg) {
  SMsgHead* pHead = msg;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);
}

S
Shengliang Guan 已提交
102
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
M
Minghao Li 已提交
103

S
Shengliang Guan 已提交
104
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
M
Minghao Li 已提交
105

106 107
// for leader
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
108 109
  int32_t len = 0;
  len += snprintf(buf + len, bufLen - len, "%s", "{");
110 111
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
112
    len += snprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
113
    if (i < pSyncNode->replicaNum - 1) {
114
      len += snprintf(buf + len, bufLen - len, "%s", ",");
115 116
    }
  }
117
  len += snprintf(buf + len, bufLen - len, "%s", "}");
118 119 120 121
}

// for follower
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
122 123
  int32_t len = 0;
  len += snprintf(buf + len, bufLen - len, "%s", "{");
124 125
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
126
    len += snprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
127
    if (i < pSyncNode->replicaNum - 1) {
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
      len += snprintf(buf + len, bufLen - len, "%s", ",");
    }
  }
  len += snprintf(buf + len, bufLen - len, "%s", "}");
}

static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
  SSyncLogBuffer* pBuf = pSyncNode->pLogBuf;
  if (pBuf == NULL) {
    return;
  }
  int len = 0;
  len += snprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex,
                  pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
}

144
static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
145 146 147 148 149 150 151 152 153
  int len = 0;
  len += snprintf(buf + len, bufLen - len, "%s", "{");
  for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
    SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
    if (pMgr == NULL) break;
    len += snprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 ")", i, pMgr->restored,
                    pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
    if (i + 1 < pSyncNode->replicaNum) {
      len += snprintf(buf + len, bufLen - len, "%s", ", ");
154 155
    }
  }
156
  len += snprintf(buf + len, bufLen - len, "%s", "}");
157 158
}

159
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
160 161
  int32_t len = 0;
  len += snprintf(buf + len, bufLen - len, "%s", "{");
162 163 164
  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i]));
    if (pState == NULL) break;
165 166
    len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "%s", i, pState->lastSendIndex,
                    pState->lastSendTime, (i < pSyncNode->replicaNum - 1) ? ", " : "");
167
  }
168
  len += snprintf(buf + len, bufLen - len, "%s", "}");
169 170
}

S
Shengliang Guan 已提交
171
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
S
Shengliang Guan 已提交
172
  if (pNode == NULL || pNode->pLogStore == NULL) return;
173
  int64_t currentTerm = raftStoreGetTerm(pNode);
M
Minghao Li 已提交
174

175 176 177
  // save error code, otherwise it will be overwritten
  int32_t errCode = terrno;

S
Shengliang Guan 已提交
178 179 180 181 182 183 184 185 186 187 188 189
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  }

190 191 192
  int32_t cacheHit = pNode->pLogStore->cacheHit;
  int32_t cacheMiss = pNode->pLogStore->cacheMiss;

193
  char cfgStr[1024] = "";
194
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
S
Shengliang Guan 已提交
195

196
  char replMgrStatesStr[1024] = "";
197
  syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr));
198 199 200

  char bufferStatesStr[256] = "";
  syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
201

202
  char hbrTimeStr[256] = "";
203 204
  syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr));

205
  char hbTimeStr[256] = "";
206 207
  syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr));

S
Shengliang Guan 已提交
208 209 210 211 212 213
  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

214 215 216 217
  int32_t aqItems = 0;
  if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
    aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
  }
218 219

  // restore error code
220
  terrno = errCode;
221 222
  SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);

223
  if (pNode != NULL) {
224
    taosPrintLog(flags, level, dflag,
225 226
                 "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", applied-index:%" PRId64
                 ", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
227
                 ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
228
                 "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
229
                 ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
230
                 ", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s",
231 232 233 234 235 236
                 pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, appliedIndex,
                 logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
                 pNode->electNum, pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum,
                 aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
                 pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
                 pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
237
  }
S
Shengliang Guan 已提交
238 239 240 241 242
}

void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
                                const char* format, ...) {
  SSyncNode* pNode = pSender->pSyncNode;
S
Shengliang Guan 已提交
243
  if (pNode == NULL || pNode->pLogStore == NULL) return;
S
Shengliang Guan 已提交
244 245 246 247 248 249 250 251 252 253 254

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
M
Minghao Li 已提交
255
  }
S
Shengliang Guan 已提交
256

257
  char cfgStr[1024] = "";
258
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
S
Shengliang Guan 已提交
259

260
  char peerStr[1024] = "";
261 262
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));

S
Shengliang Guan 已提交
263 264 265 266 267 268 269
  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

  taosPrintLog(flags, level, dflag,
270 271 272 273 274 275 276
               "vgId:%d, %s, sync:%s, snap-sender:{%p start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64
               " last-term:%" PRIu64 " last-cfg:%" PRId64
               ", seq:%d ack:%d finish:%d, as:%d dnode:%d}"
               ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
               ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
               "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
               ", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s",
S
Shengliang Guan 已提交
277
               pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
S
Shengliang Guan 已提交
278 279
               pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
               pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
280
               DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex,
281
               logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
282 283 284
               pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex,
               pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
               pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
S
Shengliang Guan 已提交
285 286 287 288 289
}

void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
                                  const char* format, ...) {
  SSyncNode* pNode = pReceiver->pSyncNode;
S
Shengliang Guan 已提交
290
  if (pNode == NULL || pNode->pLogStore == NULL) return;
S
Shengliang Guan 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  }

304
  char cfgStr[1024] = "";
305
  syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
S
Shengliang Guan 已提交
306

307
  char peerStr[1024] = "";
308 309
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));

S
Shengliang Guan 已提交
310 311 312 313 314 315
  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

316 317 318 319 320 321 322 323 324 325 326 327
  taosPrintLog(
      flags, level, dflag,
      "vgId:%d, %s, sync:%s,"
      " snap-receiver:{%p started:%d acked:%d term:%" PRIu64 " start-time:%" PRId64 " from-dnode:%d, start:%" PRId64
      " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
      "}"
      ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
      ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
      ", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s",
      pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term,
      pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
      pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
328
      raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
329 330 331
      snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
      pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
      syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
332
}
S
Shengliang Guan 已提交
333 334

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
S
Shengliang Guan 已提交
335 336
  if (!(sDebugFlag & DEBUG_TRACE)) return;

M
Minghao Li 已提交
337 338 339 340 341
  int64_t tsNow = taosGetTimestampMs();
  int64_t timeDIff = tsNow - pMsg->timeStamp;
  sNTrace(
      pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s",
      syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s);
S
Shengliang Guan 已提交
342 343 344 345
}

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
346
          syncLocalCmdGetStr(pMsg->cmd), pMsg->currentTerm, pMsg->commitIndex, s);
S
Shengliang Guan 已提交
347 348 349 350
}

void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
  sNTrace(pSyncNode,
351
          "send sync-append-entries-reply to dnode:%d, {term:%" PRId64 ", pterm:%" PRId64
352
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
353
          DID(&pMsg->destId), pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
S
Shengliang Guan 已提交
354 355 356 357
}

void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
  sNTrace(pSyncNode,
358
          "recv sync-append-entries-reply from dnode:%d {term:%" PRId64 ", pterm:%" PRId64
M
Minghao Li 已提交
359
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
360
          DID(&pMsg->srcId), pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
S
Shengliang Guan 已提交
361 362
}

363 364
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
                          int64_t execTime) {
365 366
  if (printX) {
    sNTrace(pSyncNode,
367 368
            "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
            ", ts:%" PRId64 "}, x",
369
            DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
370 371
  } else {
    sNTrace(pSyncNode,
372 373
            "send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
            ", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
374 375
            DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
            execTime);
376
  }
S
Shengliang Guan 已提交
377 378
}

M
Minghao Li 已提交
379
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
380 381 382
  if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
    pSyncNode->hbSlowNum++;

383 384 385 386
    sNTrace(pSyncNode,
            "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
            ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
            DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
387 388
  }

S
Shengliang Guan 已提交
389
  sNTrace(pSyncNode,
390 391
          "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
          ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
392
          DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
S
Shengliang Guan 已提交
393 394 395
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
396 397
  sNTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s",
          DID(&pMsg->destId), pMsg->term, pMsg->timeStamp, s);
S
Shengliang Guan 已提交
398 399
}

M
Minghao Li 已提交
400
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s) {
401 402 403 404
  if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
    pSyncNode->hbrSlowNum++;

    sNTrace(pSyncNode,
405 406
            "recv sync-heartbeat-reply from dnode:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
            DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
407 408
  }

M
Minghao Li 已提交
409
  sNTrace(pSyncNode,
410 411
          "recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
          DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
S
Shengliang Guan 已提交
412 413 414
}

void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
S
Shengliang Guan 已提交
415
  sNDebug(pSyncNode,
416 417
          "send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
418 419
          DID(&pMsg->destId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
          pMsg->startTime);
S
Shengliang Guan 已提交
420 421 422
}

void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
S
Shengliang Guan 已提交
423
  sNDebug(pSyncNode,
424 425
          "recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%" PRId64 ", begin-index:%" PRId64
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64 ", data-len:%u",
426 427
          DID(&pMsg->srcId), s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm,
          pMsg->startTime, pMsg->dataLen);
S
Shengliang Guan 已提交
428 429 430
}

void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
S
Shengliang Guan 已提交
431
  sNDebug(pSyncNode,
432 433
          "send sync-snapshot-rsp to dnode:%d, %s, acked:%d, term:%" PRId64 ", begin-index:%" PRId64
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
434 435
          DID(&pMsg->destId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
          pMsg->startTime);
S
Shengliang Guan 已提交
436 437 438
}

void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
S
Shengliang Guan 已提交
439
  sNDebug(pSyncNode,
440 441
          "recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%" PRId64 ", begin-index:%" PRId64
          ", last-index:%" PRId64 ", last-term:%" PRId64 ", start-time:%" PRId64,
442 443
          DID(&pMsg->srcId), s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm,
          pMsg->startTime);
S
Shengliang Guan 已提交
444 445 446 447
}

void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
  sNTrace(pSyncNode,
448 449 450
          "recv sync-append-entries from dnode:%d {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
          "}, commit-index:%" PRId64 ", datalen:%d}, %s",
          DID(&pMsg->srcId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->dataLen, s);
S
Shengliang Guan 已提交
451 452 453 454
}

void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
  sNTrace(pSyncNode,
455 456
          "send sync-append-entries to dnode:%d, {term:%" PRId64 ", prev-log:{index:%" PRId64 ", term:%" PRId64
          "}, index:%" PRId64 ", commit-index:%" PRId64 ", datalen:%d}, %s",
457 458
          DID(&pMsg->destId), pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1),
          pMsg->commitIndex, pMsg->dataLen, s);
S
Shengliang Guan 已提交
459 460
}

461 462 463 464 465 466 467 468
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted,
                            const char* errmsg) {
  char statusMsg[64];
  snprintf(statusMsg, sizeof(statusMsg), "granted:%d", voteGranted);
  sNInfo(pSyncNode,
         "recv sync-request-vote from dnode:%d, {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
         DID(&pMsg->srcId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm,
         (voteGranted != -1) ? statusMsg : errmsg);
S
Shengliang Guan 已提交
469 470 471
}

void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
472 473
  sNInfo(pNode,
         "send sync-request-vote to dnode:%d {term:%" PRId64 ", last-index:%" PRId64 ", last-term:%" PRId64 "}, %s",
474
         DID(&pMsg->destId), pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
S
Shengliang Guan 已提交
475 476 477
}

void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
478 479
  sNInfo(pSyncNode, "recv sync-request-vote-reply from dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->srcId),
         pMsg->term, pMsg->voteGranted, s);
S
Shengliang Guan 已提交
480 481 482
}

void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
483 484
  sNInfo(pSyncNode, "send sync-request-vote-reply to dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->destId),
         pMsg->term, pMsg->voteGranted, s);
485
}