syncUtil.c 25.7 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncUtil.h"
18
#include "syncIndexMgr.h"
S
Shengliang Guan 已提交
19
#include "syncMessage.h"
S
Shengliang Guan 已提交
20 21 22
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
M
Minghao Li 已提交
23

S
Shengliang Guan 已提交
24
extern void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
M
Minghao Li 已提交
25

M
Minghao Li 已提交
26
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
27 28
  uint32_t hostU32 = taosGetIpv4FromFqdn(host);
  if (hostU32 == (uint32_t)-1) {
S
Shengliang Guan 已提交
29
    sError("failed to resolve ipv4 addr, host:%s", host);
30
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
31 32 33
    return -1;
  }

S
Shengliang Guan 已提交
34
  uint64_t u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16);
M
Minghao Li 已提交
35 36
  return u64;
}
M
Minghao Li 已提交
37

S
Shengliang Guan 已提交
38
void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) {
M
Minghao Li 已提交
39
  uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF);
M
Minghao Li 已提交
40

S
Shengliang Guan 已提交
41
  struct in_addr addr = {.s_addr = hostU32};
wafwerar's avatar
wafwerar 已提交
42
  taosInetNtoa(addr, host, len);
M
Minghao Li 已提交
43 44 45
  *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
}

46
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
M
Minghao Li 已提交
47
  pEpSet->inUse = 0;
M
Minghao Li 已提交
48
  pEpSet->numOfEps = 0;
S
Shengliang Guan 已提交
49
  addEpIntoEpSet(pEpSet, pInfo->nodeFqdn, pInfo->nodePort);
M
Minghao Li 已提交
50 51
}

52
void syncUtilRaftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
53
  char     host[TSDB_FQDN_LEN] = {0};
54
  uint16_t port = 0;
M
Minghao Li 已提交
55 56 57

  syncUtilU642Addr(raftId->addr, host, sizeof(host), &port);
  pEpSet->inUse = 0;
M
Minghao Li 已提交
58
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
59 60 61
  addEpIntoEpSet(pEpSet, host, port);
}

62
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
S
Shengliang Guan 已提交
63
  uint32_t ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn);
64
  if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
S
Shengliang Guan 已提交
65
    sError("failed to resolve ipv4 addr, fqdn: %s", pInfo->nodeFqdn);
66 67 68
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return false;
  }
S
Shengliang Guan 已提交
69

70
  char ipbuf[128] = {0};
M
sync io  
Minghao Li 已提交
71
  tinet_ntoa(ipbuf, ipv4);
S
Shengliang Guan 已提交
72
  raftId->addr = syncUtilAddr2U64(ipbuf, pInfo->nodePort);
M
Minghao Li 已提交
73
  raftId->vgId = vgId;
74
  return true;
M
Minghao Li 已提交
75 76
}

M
Minghao Li 已提交
77
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
78
  return pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
M
Minghao Li 已提交
79
}
M
Minghao Li 已提交
80 81

bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); }
M
Minghao Li 已提交
82

S
Shengliang Guan 已提交
83
static inline int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
M
Minghao Li 已提交
84

M
Minghao Li 已提交
85
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
86 87 88 89
  int32_t rdm = min + syncUtilRand(max - min);

  // sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
  return rdm;
M
Minghao Li 已提交
90
}
M
Minghao Li 已提交
91

M
Minghao Li 已提交
92 93 94
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }

cJSON* syncUtilRaftId2Json(const SRaftId* p) {
95
  char   u64buf[128] = {0};
M
Minghao Li 已提交
96 97
  cJSON* pRoot = cJSON_CreateObject();

98
  snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", p->addr);
M
Minghao Li 已提交
99
  cJSON_AddStringToObject(pRoot, "addr", u64buf);
100
  char     host[128] = {0};
M
Minghao Li 已提交
101 102 103 104 105 106 107
  uint16_t port;
  syncUtilU642Addr(p->addr, host, sizeof(host), &port);
  cJSON_AddStringToObject(pRoot, "host", host);
  cJSON_AddNumberToObject(pRoot, "port", port);
  cJSON_AddNumberToObject(pRoot, "vgId", p->vgId);

  cJSON* pJson = cJSON_CreateObject();
M
Minghao Li 已提交
108
  cJSON_AddItemToObject(pJson, "SRaftId", pRoot);
M
Minghao Li 已提交
109 110 111
  return pJson;
}

S
Shengliang Guan 已提交
112
static inline bool syncUtilCanPrint(char c) {
M
Minghao Li 已提交
113 114 115 116 117 118 119
  if (c >= 32 && c <= 126) {
    return true;
  } else {
    return false;
  }
}

S
Shengliang Guan 已提交
120
char* syncUtilPrintBin(char* ptr, uint32_t len) {
M
Minghao Li 已提交
121 122
  int64_t memLen = (int64_t)(len + 1);
  char*   s = taosMemoryMalloc(memLen);
123
  ASSERT(s != NULL);
M
Minghao Li 已提交
124 125 126
  memset(s, 0, len + 1);
  memcpy(s, ptr, len);

S
Shengliang Guan 已提交
127
  for (int32_t i = 0; i < len; ++i) {
M
Minghao Li 已提交
128 129 130 131 132 133 134
    if (!syncUtilCanPrint(s[i])) {
      s[i] = '.';
    }
  }
  return s;
}

S
Shengliang Guan 已提交
135
char* syncUtilPrintBin2(char* ptr, uint32_t len) {
M
Minghao Li 已提交
136
  uint32_t len2 = len * 4 + 1;
wafwerar's avatar
wafwerar 已提交
137
  char*    s = taosMemoryMalloc(len2);
138
  ASSERT(s != NULL);
M
Minghao Li 已提交
139 140 141
  memset(s, 0, len2);

  char* p = s;
S
Shengliang Guan 已提交
142 143
  for (int32_t i = 0; i < len; ++i) {
    int32_t n = sprintf(p, "%d,", ptr[i]);
M
Minghao Li 已提交
144 145 146
    p += n;
  }
  return s;
M
Minghao Li 已提交
147 148
}

M
Minghao Li 已提交
149 150 151 152 153 154 155 156 157 158
void syncUtilMsgHtoN(void* msg) {
  SMsgHead* pHead = msg;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);
}

void syncUtilMsgNtoH(void* msg) {
  SMsgHead* pHead = msg;
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
M
Minghao Li 已提交
159 160
}

S
Shengliang Guan 已提交
161
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
M
Minghao Li 已提交
162

S
Shengliang Guan 已提交
163
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
M
Minghao Li 已提交
164

165 166 167 168 169 170 171 172 173 174 175 176
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
  int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex);

  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    if (i < pCfg->replicaNum - 1) {
      len += snprintf(buf + len, bufLen - len, "%s:%d, ", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
    } else {
      len += snprintf(buf + len, bufLen - len, "%s:%d}", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
    }
  }
}

177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
// for leader
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
  int32_t len = 5;

  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));

    if (i < pSyncNode->replicaNum - 1) {
      len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs);
    } else {
      len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs);
    }
  }
}

// for follower
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
  int32_t len = 4;

  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));

    if (i < pSyncNode->replicaNum - 1) {
      len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs);
    } else {
      len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs);
    }
  }
}

207 208 209 210 211 212 213 214 215
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
  int32_t len = 1;

  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i]));
    if (pState == NULL) break;

    if (i < pSyncNode->replicaNum - 1) {
      len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 ", ", i, pState->lastSendIndex,
216
                      pState->lastSendTime);
217 218 219 220 221 222 223
    } else {
      len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "}", i, pState->lastSendIndex,
                      pState->lastSendTime);
    }
  }
}

S
Shengliang Guan 已提交
224
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
S
Shengliang Guan 已提交
225
  if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
M
Minghao Li 已提交
226
  int64_t currentTerm = pNode->pRaftStore->currentTerm;
M
Minghao Li 已提交
227

228 229 230
  // save error code, otherwise it will be overwritten
  int32_t errCode = terrno;

S
Shengliang Guan 已提交
231 232 233 234 235 236 237 238 239 240 241 242
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  }

243 244 245
  int32_t cacheHit = pNode->pLogStore->cacheHit;
  int32_t cacheMiss = pNode->pLogStore->cacheMiss;

S
Shengliang Guan 已提交
246
  char cfgStr[1024];
247 248 249 250 251
  if (pNode->pRaftCfg != NULL) {
    syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));
  } else {
    return;
  }
S
Shengliang Guan 已提交
252

253 254 255
  char peerStr[1024] = "{";
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));

256 257 258 259 260 261
  char hbrTimeStr[256] = "hbr:{";
  syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr));

  char hbTimeStr[256] = "hb:{";
  syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr));

S
Shengliang Guan 已提交
262 263 264 265 266 267 268 269
  int32_t quorum = syncNodeDynamicQuorum(pNode);

  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

270 271 272 273
  int32_t aqItems = 0;
  if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
    aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
  }
274 275

  // restore error code
276
  terrno = errCode;
277

278 279
  if (pNode != NULL && pNode->pRaftCfg != NULL) {
    taosPrintLog(flags, level, dflag,
S
Shengliang Guan 已提交
280 281
                 "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64
                 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
282 283 284
                 ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, "
                 "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
                 ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s",
S
Shengliang Guan 已提交
285
                 pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex,
286 287 288
                 logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum,
                 pNode->becomeLeaderNum, pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum,
                 pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
289
                 pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
290 291
                 pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr,
                 hbrTimeStr);
292
  }
S
Shengliang Guan 已提交
293 294 295 296 297
}

void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
                                const char* format, ...) {
  SSyncNode* pNode = pSender->pSyncNode;
S
Shengliang Guan 已提交
298
  if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
S
Shengliang Guan 已提交
299 300 301 302 303 304 305 306 307 308 309

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
M
Minghao Li 已提交
310
  }
S
Shengliang Guan 已提交
311 312 313 314

  char cfgStr[1024];
  syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));

315 316 317
  char peerStr[1024] = "{";
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));

S
Shengliang Guan 已提交
318 319 320 321 322 323 324 325 326 327 328 329 330
  int32_t  quorum = syncNodeDynamicQuorum(pNode);
  SRaftId  destId = pNode->replicasId[pSender->replicaIndex];
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);

  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

  taosPrintLog(flags, level, dflag,
S
Shengliang Guan 已提交
331 332
               "vgId:%d, %s, sync:%s, {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64
               " lcindex:%" PRId64
S
Shengliang Guan 已提交
333 334 335 336
               " seq:%d ack:%d finish:%d replica-index:%d %s:%d}"
               ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
               ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
               ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
S
Shengliang Guan 已提交
337
               pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
S
Shengliang Guan 已提交
338 339 340 341 342 343
               pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
               pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
               host, port, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex,
               pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy,
               pNode->pRaftCfg->snapshotStrategy, pNode->pRaftCfg->batchSize, pNode->replicaNum,
               pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
344
               pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
S
Shengliang Guan 已提交
345 346 347 348 349
}

void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
                                  const char* format, ...) {
  SSyncNode* pNode = pReceiver->pSyncNode;
S
Shengliang Guan 已提交
350
  if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
S
Shengliang Guan 已提交
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  }

  char cfgStr[1024];
  syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));

367 368 369
  char peerStr[1024] = "{";
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));

S
Shengliang Guan 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382
  int32_t  quorum = syncNodeDynamicQuorum(pNode);
  SRaftId  fromId = pReceiver->fromId;
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);

  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

  taosPrintLog(flags, level, dflag,
S
Shengliang Guan 已提交
383 384
               "vgId:%d, %s, sync:%s,"
               " {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64
S
Shengliang Guan 已提交
385 386 387 388 389
               " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
               "}"
               ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
               ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
               ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
S
Shengliang Guan 已提交
390
               pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack,
S
Shengliang Guan 已提交
391 392 393 394 395 396
               pReceiver->term, pReceiver->startTime, host, port, pReceiver->snapshotParam.start,
               pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
               pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex,
               logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
               pNode->pRaftCfg->isStandBy, pNode->pRaftCfg->snapshotStrategy, pNode->pRaftCfg->batchSize,
               pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
397
               pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
398
}
S
Shengliang Guan 已提交
399 400

void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
S
Shengliang Guan 已提交
401 402
  if (!(sDebugFlag & DEBUG_TRACE)) return;

M
Minghao Li 已提交
403 404 405 406 407
  int64_t tsNow = taosGetTimestampMs();
  int64_t timeDIff = tsNow - pMsg->timeStamp;
  sNTrace(
      pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, ts:%" PRId64 ", elapsed:%" PRId64 ", data:%p}, %s",
      syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->timeStamp, timeDIff, pMsg->data, s);
S
Shengliang Guan 已提交
408 409 410
}

void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
S
Shengliang Guan 已提交
411 412
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
413 414 415 416 417
  sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
          syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s);
}

void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
S
Shengliang Guan 已提交
418 419
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
420 421 422 423 424
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);

  sNTrace(pSyncNode,
425 426
          "send sync-append-entries-reply to %s:%d, {term:%" PRId64 ", pterm:%" PRId64
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
427
          host, port, pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
S
Shengliang Guan 已提交
428 429 430
}

void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
S
Shengliang Guan 已提交
431 432
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
433 434 435 436 437
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

  sNTrace(pSyncNode,
M
Minghao Li 已提交
438 439
          "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64
          ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
440
          host, port, pMsg->term, pMsg->lastMatchTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
S
Shengliang Guan 已提交
441 442
}

443 444
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
                          int64_t execTime) {
445 446
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
447 448 449 450
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);

451 452 453 454 455 456 457 458
  if (printX) {
    sNTrace(pSyncNode,
            "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
            "}, x",
            host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
  } else {
    sNTrace(pSyncNode,
            "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
S
Shengliang Guan 已提交
459 460
            "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
            host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed, execTime);
461
  }
S
Shengliang Guan 已提交
462 463
}

M
Minghao Li 已提交
464
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
465 466 467 468 469 470 471 472
  if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
    pSyncNode->hbSlowNum++;

    char     host[64];
    uint16_t port;
    syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
    sNInfo(pSyncNode,
           "recv sync-heartbeat from %s:%d slow {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
M
Minghao Li 已提交
473 474
           "}, %s, net elapsed:%" PRId64,
           host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
475 476
  }

477 478
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
479 480 481 482
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
  sNTrace(pSyncNode,
483
          "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
M
Minghao Li 已提交
484 485
          "}, %s, net elapsed:%" PRId64,
          host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
S
Shengliang Guan 已提交
486 487 488
}

void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
S
Shengliang Guan 已提交
489 490
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
491 492 493 494
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);

495 496
  sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port,
          pMsg->term, pMsg->timeStamp, s);
S
Shengliang Guan 已提交
497 498
}

M
Minghao Li 已提交
499
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s) {
500 501 502 503 504 505 506
  if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
    pSyncNode->hbrSlowNum++;

    char     host[64];
    uint16_t port;
    syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
    sNTrace(pSyncNode,
M
Minghao Li 已提交
507 508
            "recv sync-heartbeat-reply from %s:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
            host, port, pMsg->term, pMsg->timeStamp, s, timeDiff);
509 510
  }

S
Shengliang Guan 已提交
511 512
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
513 514 515
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
M
Minghao Li 已提交
516 517 518
  sNTrace(pSyncNode,
          "recv sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, host,
          port, pMsg->term, pMsg->timeStamp, s, timeDiff);
S
Shengliang Guan 已提交
519 520 521
}

void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
S
Shengliang Guan 已提交
522
  if (!(sDebugFlag & DEBUG_DEBUG)) return;
S
Shengliang Guan 已提交
523

S
Shengliang Guan 已提交
524 525 526 527
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);

S
Shengliang Guan 已提交
528 529 530 531
  sNDebug(pSyncNode,
          "send sync-snapshot-send to %s:%u, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64
          ", lterm:%" PRId64 ", stime:%" PRId64,
          host, port, s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
S
Shengliang Guan 已提交
532 533 534
}

void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
S
Shengliang Guan 已提交
535
  if (!(sDebugFlag & DEBUG_DEBUG)) return;
S
Shengliang Guan 已提交
536

S
Shengliang Guan 已提交
537 538 539 540
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

S
Shengliang Guan 已提交
541 542 543 544 545
  sNDebug(pSyncNode,
          "recv sync-snapshot-send from %s:%u, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
          ", lterm:%" PRId64 ", stime:%" PRId64 ", len:%u",
          host, port, s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime,
          pMsg->dataLen);
S
Shengliang Guan 已提交
546 547 548
}

void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
S
Shengliang Guan 已提交
549
  if (!(sDebugFlag & DEBUG_DEBUG)) return;
S
Shengliang Guan 已提交
550 551 552 553
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);

S
Shengliang Guan 已提交
554 555 556 557
  sNDebug(pSyncNode,
          "send sync-snapshot-rsp to %s:%u, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
          ", lterm:%" PRId64 ", stime:%" PRId64,
          host, port, s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
S
Shengliang Guan 已提交
558 559 560
}

void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
S
Shengliang Guan 已提交
561
  if (!(sDebugFlag & DEBUG_DEBUG)) return;
S
Shengliang Guan 已提交
562

S
Shengliang Guan 已提交
563 564 565 566
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

S
Shengliang Guan 已提交
567 568 569 570
  sNDebug(pSyncNode,
          "recv sync-snapshot-rsp from %s:%u, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
          ", lterm:%" PRId64 ", stime:%" PRId64,
          host, port, s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
S
Shengliang Guan 已提交
571 572 573
}

void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
S
Shengliang Guan 已提交
574 575
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
576 577 578 579 580 581 582 583 584 585 586 587
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);

  sNTrace(pSyncNode,
          "recv sync-append-entries from %s:%d {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
          ", cmt:%" PRId64 ", pterm:%" PRId64 ", datalen:%d}, %s",
          host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
          pMsg->dataLen, s);
}

void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
S
Shengliang Guan 已提交
588 589
  if (!(sDebugFlag & DEBUG_TRACE)) return;

S
Shengliang Guan 已提交
590 591 592 593 594
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
  sNTrace(pSyncNode,
          "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
M
Minghao Li 已提交
595 596
          ", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
          host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1), pMsg->commitIndex,
S
Shengliang Guan 已提交
597 598 599
          pMsg->dataLen, s);
}

S
Shengliang Guan 已提交
600
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s) {
601
  // if (!(sDebugFlag & DEBUG_TRACE)) return;
S
Shengliang Guan 已提交
602

S
Shengliang Guan 已提交
603 604 605 606
  char     logBuf[256];
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
S
Shengliang Guan 已提交
607 608

  if (voteGranted == -1) {
609 610 611
    sNInfo(pSyncNode,
           "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
           port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
S
Shengliang Guan 已提交
612
  } else {
613 614 615
    sNInfo(pSyncNode,
           "recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, granted:%d",
           host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted);
S
Shengliang Guan 已提交
616
  }
S
Shengliang Guan 已提交
617 618 619
}

void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
620
  // if (!(sDebugFlag & DEBUG_TRACE)) return;
S
Shengliang Guan 已提交
621

S
Shengliang Guan 已提交
622 623 624
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
625 626
  sNInfo(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
         port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
S
Shengliang Guan 已提交
627 628 629
}

void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
630
  // if (!(sDebugFlag & DEBUG_TRACE)) return;
S
Shengliang Guan 已提交
631

S
Shengliang Guan 已提交
632 633 634
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
635 636
  sNInfo(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
         pMsg->voteGranted, s);
S
Shengliang Guan 已提交
637 638 639
}

void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
640
  // if (!(sDebugFlag & DEBUG_TRACE)) return;
S
Shengliang Guan 已提交
641

S
Shengliang Guan 已提交
642 643 644
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
645 646
  sNInfo(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
         pMsg->voteGranted, s);
647
}