syncUtil.c 14.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncUtil.h"
S
Shengliang Guan 已提交
18 19 20
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
M
Minghao Li 已提交
21

S
Shengliang Guan 已提交
22
extern void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
M
Minghao Li 已提交
23

M
Minghao Li 已提交
24
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
25 26
  uint32_t hostU32 = taosGetIpv4FromFqdn(host);
  if (hostU32 == (uint32_t)-1) {
S
Shengliang Guan 已提交
27
    sError("failed to resolve ipv4 addr, host:%s", host);
28
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
29 30 31
    return -1;
  }

S
Shengliang Guan 已提交
32
  uint64_t u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16);
M
Minghao Li 已提交
33 34
  return u64;
}
M
Minghao Li 已提交
35

S
Shengliang Guan 已提交
36
void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) {
M
Minghao Li 已提交
37
  uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF);
M
Minghao Li 已提交
38

S
Shengliang Guan 已提交
39
  struct in_addr addr = {.s_addr = hostU32};
wafwerar's avatar
wafwerar 已提交
40
  taosInetNtoa(addr, host, len);
M
Minghao Li 已提交
41 42 43
  *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
}

S
Shengliang Guan 已提交
44
void syncUtilnodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
M
Minghao Li 已提交
45
  pEpSet->inUse = 0;
M
Minghao Li 已提交
46
  pEpSet->numOfEps = 0;
S
Shengliang Guan 已提交
47
  addEpIntoEpSet(pEpSet, pInfo->nodeFqdn, pInfo->nodePort);
M
Minghao Li 已提交
48 49 50
}

void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
51
  char     host[TSDB_FQDN_LEN] = {0};
M
Minghao Li 已提交
52 53 54 55
  uint16_t port;

  syncUtilU642Addr(raftId->addr, host, sizeof(host), &port);
  pEpSet->inUse = 0;
M
Minghao Li 已提交
56
  pEpSet->numOfEps = 0;
M
Minghao Li 已提交
57 58 59
  addEpIntoEpSet(pEpSet, host, port);
}

S
Shengliang Guan 已提交
60 61
bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
  uint32_t ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn);
62
  if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
S
Shengliang Guan 已提交
63
    sError("failed to resolve ipv4 addr, fqdn: %s", pInfo->nodeFqdn);
64 65 66
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return false;
  }
S
Shengliang Guan 已提交
67

68
  char ipbuf[128] = {0};
M
sync io  
Minghao Li 已提交
69
  tinet_ntoa(ipbuf, ipv4);
S
Shengliang Guan 已提交
70
  raftId->addr = syncUtilAddr2U64(ipbuf, pInfo->nodePort);
M
Minghao Li 已提交
71
  raftId->vgId = vgId;
72
  return true;
M
Minghao Li 已提交
73 74
}

M
Minghao Li 已提交
75 76 77 78
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
  bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
  return ret;
}
M
Minghao Li 已提交
79 80

bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); }
M
Minghao Li 已提交
81

S
Shengliang Guan 已提交
82
static inline int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
M
Minghao Li 已提交
83

M
Minghao Li 已提交
84
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
85 86 87 88
  int32_t rdm = min + syncUtilRand(max - min);

  // sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
  return rdm;
M
Minghao Li 已提交
89
}
M
Minghao Li 已提交
90

M
Minghao Li 已提交
91 92 93
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }

cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) {
94
  char   u64buf[128] = {0};
M
Minghao Li 已提交
95 96 97 98 99 100 101 102 103 104 105
  cJSON* pRoot = cJSON_CreateObject();

  cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn);
  cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort);

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot);
  return pJson;
}

cJSON* syncUtilRaftId2Json(const SRaftId* p) {
106
  char   u64buf[128] = {0};
M
Minghao Li 已提交
107 108
  cJSON* pRoot = cJSON_CreateObject();

109
  snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", p->addr);
M
Minghao Li 已提交
110
  cJSON_AddStringToObject(pRoot, "addr", u64buf);
111
  char     host[128] = {0};
M
Minghao Li 已提交
112 113 114 115 116 117 118
  uint16_t port;
  syncUtilU642Addr(p->addr, host, sizeof(host), &port);
  cJSON_AddStringToObject(pRoot, "host", host);
  cJSON_AddNumberToObject(pRoot, "port", port);
  cJSON_AddNumberToObject(pRoot, "vgId", p->vgId);

  cJSON* pJson = cJSON_CreateObject();
M
Minghao Li 已提交
119
  cJSON_AddItemToObject(pJson, "SRaftId", pRoot);
M
Minghao Li 已提交
120 121 122
  return pJson;
}

M
Minghao Li 已提交
123 124 125 126 127 128 129
char* syncUtilRaftId2Str(const SRaftId* p) {
  cJSON* pJson = syncUtilRaftId2Json(p);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

S
Shengliang Guan 已提交
130
static inline bool syncUtilCanPrint(char c) {
M
Minghao Li 已提交
131 132 133 134 135 136 137
  if (c >= 32 && c <= 126) {
    return true;
  } else {
    return false;
  }
}

S
Shengliang Guan 已提交
138
char* syncUtilPrintBin(char* ptr, uint32_t len) {
M
Minghao Li 已提交
139 140
  int64_t memLen = (int64_t)(len + 1);
  char*   s = taosMemoryMalloc(memLen);
M
Minghao Li 已提交
141
  ASSERT(s != NULL);
M
Minghao Li 已提交
142 143 144
  memset(s, 0, len + 1);
  memcpy(s, ptr, len);

S
Shengliang Guan 已提交
145
  for (int32_t i = 0; i < len; ++i) {
M
Minghao Li 已提交
146 147 148 149 150 151 152
    if (!syncUtilCanPrint(s[i])) {
      s[i] = '.';
    }
  }
  return s;
}

S
Shengliang Guan 已提交
153
char* syncUtilPrintBin2(char* ptr, uint32_t len) {
M
Minghao Li 已提交
154
  uint32_t len2 = len * 4 + 1;
wafwerar's avatar
wafwerar 已提交
155
  char*    s = taosMemoryMalloc(len2);
M
Minghao Li 已提交
156
  ASSERT(s != NULL);
M
Minghao Li 已提交
157 158 159
  memset(s, 0, len2);

  char* p = s;
S
Shengliang Guan 已提交
160 161
  for (int32_t i = 0; i < len; ++i) {
    int32_t n = sprintf(p, "%d,", ptr[i]);
M
Minghao Li 已提交
162 163 164
    p += n;
  }
  return s;
M
Minghao Li 已提交
165 166
}

M
Minghao Li 已提交
167 168 169 170 171 172 173 174 175 176 177 178
void syncUtilMsgHtoN(void* msg) {
  // htonl
  SMsgHead* pHead = msg;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);
}

void syncUtilMsgNtoH(void* msg) {
  // ntohl
  SMsgHead* pHead = msg;
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
M
Minghao Li 已提交
179 180
}

S
Shengliang Guan 已提交
181
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
M
Minghao Li 已提交
182

S
Shengliang Guan 已提交
183
bool syncUtilUserCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
M
Minghao Li 已提交
184

S
Shengliang Guan 已提交
185
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
M
Minghao Li 已提交
186

187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
  int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex);

  for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
    if (i < pCfg->replicaNum - 1) {
      len += snprintf(buf + len, bufLen - len, "%s:%d, ", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
    } else {
      len += snprintf(buf + len, bufLen - len, "%s:%d}", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
    }
  }
}

static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
  int32_t len = 1;

  for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
    SPeerState* pState = syncNodeGetPeerState(pSyncNode, &(pSyncNode->replicasId[i]));
    if (pState == NULL) break;

    if (i < pSyncNode->replicaNum - 1) {
      len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 ", ", i, pState->lastSendIndex,
208
                      pState->lastSendTime);
209 210 211 212 213 214 215
    } else {
      len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 " %" PRId64 "}", i, pState->lastSendIndex,
                      pState->lastSendTime);
    }
  }
}

S
Shengliang Guan 已提交
216 217
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
  if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
M
Minghao Li 已提交
218

219 220 221
  // save error code, otherwise it will be overwritten
  int32_t errCode = terrno;

S
Shengliang Guan 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  }

  char cfgStr[1024];
  syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));

237 238 239
  char peerStr[1024] = "{";
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));

S
Shengliang Guan 已提交
240 241 242 243 244 245 246 247
  int32_t quorum = syncNodeDynamicQuorum(pNode);

  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

248
  int32_t aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
249 250

  // restore error code
251
  terrno = errCode;
252

S
Shengliang Guan 已提交
253 254 255 256
  taosPrintLog(flags, level, dflag,
               "vgId:%d, sync %s "
               "%s"
               ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
257
               ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64
S
Shengliang Guan 已提交
258 259 260
               ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
               pNode->vgId, syncStr(pNode->state), eventLog, pNode->pRaftStore->currentTerm, pNode->commitIndex,
               logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
261 262
               pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
               pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
263
               pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
S
Shengliang Guan 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
}

void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
                                const char* format, ...) {
  SSyncNode* pNode = pSender->pSyncNode;
  if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
M
Minghao Li 已提交
281
  }
S
Shengliang Guan 已提交
282 283 284 285

  char cfgStr[1024];
  syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));

286 287 288
  char peerStr[1024] = "{";
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));

S
Shengliang Guan 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
  int32_t  quorum = syncNodeDynamicQuorum(pNode);
  SRaftId  destId = pNode->replicasId[pSender->replicaIndex];
  char     host[64];
  uint16_t port;
  syncUtilU642Addr(destId.addr, host, sizeof(host), &port);

  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

  taosPrintLog(flags, level, dflag,
               "vgId:%d, sync %s "
               "%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
               " seq:%d ack:%d finish:%d replica-index:%d %s:%d}"
               ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
               ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
               ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
               pNode->vgId, syncStr(pNode->state), eventLog, pSender, pSender->snapshotParam.start,
               pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
               pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
               host, port, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex,
               pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy,
               pNode->pRaftCfg->snapshotStrategy, pNode->pRaftCfg->batchSize, pNode->replicaNum,
               pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
315
               pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
S
Shengliang Guan 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
}

void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
                                  const char* format, ...) {
  SSyncNode* pNode = pReceiver->pSyncNode;
  if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;

  SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
  if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
    pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
  }

  SyncIndex logLastIndex = SYNC_INDEX_INVALID;
  SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
  if (pNode->pLogStore != NULL) {
    logLastIndex = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
    logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
  }

  char cfgStr[1024];
  syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));

338 339 340
  char peerStr[1024] = "{";
  syncPeerState2Str(pNode, peerStr, sizeof(peerStr));

S
Shengliang Guan 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
  int32_t  quorum = syncNodeDynamicQuorum(pNode);
  SRaftId  fromId = pReceiver->fromId;
  char     host[128];
  uint16_t port;
  syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);

  char    eventLog[512];  // {0};
  va_list argpointer;
  va_start(argpointer, format);
  int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
  va_end(argpointer);

  taosPrintLog(flags, level, dflag,
               "vgId:%d, sync %s "
               "%s {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64
               " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
               "}"
               ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
               ", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
               ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
               pNode->vgId, syncStr(pNode->state), eventLog, pReceiver, pReceiver->start, pReceiver->ack,
               pReceiver->term, pReceiver->startTime, host, port, pReceiver->snapshotParam.start,
               pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
               pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex,
               logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
               pNode->pRaftCfg->isStandBy, pNode->pRaftCfg->snapshotStrategy, pNode->pRaftCfg->batchSize,
               pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
368
               pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
369
}