syncRaftLog.c 20.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftLog.h"
M
Minghao Li 已提交
17
#include "wal.h"
M
Minghao Li 已提交
18

M
Minghao Li 已提交
19 20 21 22
// refactor, log[0 .. n] ==> log[m .. n]
static int32_t   raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
23
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35
static bool      raftLogIsEmpty(struct SSyncLogStore* pLogStore);
static int32_t   raftLogEntryCount(struct SSyncLogStore* pLogStore);
static bool      raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
static SyncTerm  raftLogLastTerm(struct SSyncLogStore* pLogStore);
static int32_t   raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t   raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
static int32_t   raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);

static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);

//-------------------------------
36
static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
M
Minghao Li 已提交
37 38 39 40 41 42 43
static SyncIndex       logStoreLastIndex(SSyncLogStore* pLogStore);
static SyncTerm        logStoreLastTerm(SSyncLogStore* pLogStore);
static SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
static int32_t         logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t         logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
static int32_t         logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex       logStoreGetCommitIndex(SSyncLogStore* pLogStore);
44

M
Minghao Li 已提交
45 46
// refactor, log[0 .. n] ==> log[m .. n]
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
M
Minghao Li 已提交
47 48
  sTrace("raftLogSetBeginIndex beginIndex:%ld", beginIndex);

M
Minghao Li 已提交
49 50 51 52 53 54 55 56 57 58
  // if beginIndex == 0, donot need call this funciton
  ASSERT(beginIndex > 0);

  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  pData->beginIndex = beginIndex;
  walRestoreFromSnapshot(pWal, beginIndex - 1);
  return 0;
}

M
Minghao Li 已提交
59 60
int32_t raftLogResetBeginIndex(struct SSyncLogStore* pLogStore) { return 0; }

M
Minghao Li 已提交
61 62 63 64 65 66 67 68 69 70 71
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return pData->beginIndex;
}

static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); }

static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
M
Minghao Li 已提交
72
  return (endIndex < beginIndex);
M
Minghao Li 已提交
73 74 75 76 77
}

static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
M
Minghao Li 已提交
78
  int32_t   count = endIndex - beginIndex + 1;
M
Minghao Li 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92
  return count > 0 ? count : 0;
}

static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
  if (index >= beginIndex && index <= endIndex) {
    return true;
  } else {
    return false;
  }
}

static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
93
  SyncIndex          lastIndex;
M
Minghao Li 已提交
94 95 96
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
M
Minghao Li 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  SyncIndex          firstVer = walGetFirstVer(pWal);

  if (lastVer < firstVer) {
    // no record
    lastIndex = -1;

  } else {
    if (firstVer >= 0) {
      lastIndex = lastVer;
    } else if (firstVer == -1) {
      lastIndex = -1;
    } else {
      ASSERT(0);
    }
  }

  return lastIndex;
}

static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
  return lastVer + 1;
M
Minghao Li 已提交
121 122 123 124
}

static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
125
  if (raftLogEntryCount(pLogStore) == 0) {
M
Minghao Li 已提交
126 127 128 129 130
    lastTerm = 0;
  } else {
    SSyncRaftEntry* pLastEntry;
    int32_t         code = raftLogGetLastEntry(pLogStore, &pLastEntry);
    ASSERT(code == 0);
M
Minghao Li 已提交
131 132 133 134
    if (pLastEntry != NULL) {
      lastTerm = pLastEntry->term;
      taosMemoryFree(pLastEntry);
    }
M
Minghao Li 已提交
135 136 137 138 139 140 141 142
  }
  return lastTerm;
}

static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
143 144
  SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
  ASSERT(pEntry->index == writeIndex);
M
Minghao Li 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162

  int          code = 0;
  SSyncLogMeta syncMeta;
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
  code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("raftLogAppendEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
    ASSERT(0);
  }

  walFsync(pWal, true);
163

164
  sTrace("sync event write index:%" PRId64, pEntry->index);
165

M
Minghao Li 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
  return code;
}

static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  int32_t            code;

  *ppEntry = NULL;
  if (raftLogInRange(pLogStore, index)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
    ASSERT(pWalHandle != NULL);

    code = walReadWithHandle(pWalHandle, index);
    if (code != 0) {
      int32_t     err = terrno;
      const char* errStr = tstrerror(err);
      int32_t     linuxErr = errno;
      const char* linuxErrMsg = strerror(errno);
      sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
             linuxErrMsg);
      walCloseReadHandle(pWalHandle);
      ASSERT(0);
      return code;
    }

    *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
    ASSERT(*ppEntry != NULL);
194
    (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
M
Minghao Li 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207
    (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
    (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
    (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
    (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
    (*ppEntry)->index = index;
    ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
    memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);

    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);

  } else {
    // index not in range
M
Minghao Li 已提交
208
    code = 0;
M
Minghao Li 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  }

  return code;
}

static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  int32_t            code = walRollback(pWal, fromIndex);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("raftLogTruncate error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
    ASSERT(0);
  }
  return code;
}

static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
M
Minghao Li 已提交
231
  *ppLastEntry = NULL;
M
Minghao Li 已提交
232
  if (raftLogEntryCount(pLogStore) == 0) {
M
Minghao Li 已提交
233
    return 0;
M
Minghao Li 已提交
234 235 236 237 238 239 240
  }
  SyncIndex lastIndex = raftLogLastIndex(pLogStore);
  int32_t   code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
  return code;
}

//-------------------------------
M
Minghao Li 已提交
241
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
wafwerar's avatar
wafwerar 已提交
242
  SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
M
Minghao Li 已提交
243
  assert(pLogStore != NULL);
M
Minghao Li 已提交
244

wafwerar's avatar
wafwerar 已提交
245
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
M
Minghao Li 已提交
246 247 248 249 250 251
  assert(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;

M
Minghao Li 已提交
252 253 254 255 256 257 258 259 260 261
  SyncIndex firstVer = walGetFirstVer(pData->pWal);
  SyncIndex lastVer = walGetLastVer(pData->pWal);
  if (firstVer >= 0) {
    pData->beginIndex = firstVer;
  } else if (firstVer == -1) {
    pData->beginIndex = lastVer + 1;
  } else {
    ASSERT(0);
  }

M
Minghao Li 已提交
262 263 264 265 266 267 268
  pLogStore->appendEntry = logStoreAppendEntry;
  pLogStore->getEntry = logStoreGetEntry;
  pLogStore->truncate = logStoreTruncate;
  pLogStore->getLastIndex = logStoreLastIndex;
  pLogStore->getLastTerm = logStoreLastTerm;
  pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
  pLogStore->getCommitIndex = logStoreGetCommitIndex;
M
Minghao Li 已提交
269 270 271 272 273 274 275 276 277 278 279 280

  pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
  pLogStore->syncLogBeginIndex = raftLogBeginIndex;
  pLogStore->syncLogEndIndex = raftLogEndIndex;
  pLogStore->syncLogIsEmpty = raftLogIsEmpty;
  pLogStore->syncLogEntryCount = raftLogEntryCount;
  pLogStore->syncLogInRange = raftLogInRange;
  pLogStore->syncLogLastIndex = raftLogLastIndex;
  pLogStore->syncLogLastTerm = raftLogLastTerm;
  pLogStore->syncLogAppendEntry = raftLogAppendEntry;
  pLogStore->syncLogGetEntry = raftLogGetEntry;
  pLogStore->syncLogTruncate = raftLogTruncate;
M
Minghao Li 已提交
281
  pLogStore->syncLogWriteIndex = raftLogWriteIndex;
M
Minghao Li 已提交
282

M
Minghao Li 已提交
283
  return pLogStore;
M
Minghao Li 已提交
284 285 286 287
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
wafwerar's avatar
wafwerar 已提交
288 289
    taosMemoryFree(pLogStore->data);
    taosMemoryFree(pLogStore);
M
Minghao Li 已提交
290 291 292
  }
}

M
Minghao Li 已提交
293
//-------------------------------
M
Minghao Li 已提交
294 295 296 297
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
298 299
  SyncIndex lastIndex = logStoreLastIndex(pLogStore);
  assert(pEntry->index == lastIndex + 1);
M
Minghao Li 已提交
300

M
Minghao Li 已提交
301 302 303 304 305 306
  int          code = 0;
  SSyncLogMeta syncMeta;
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
  code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
M
Minghao Li 已提交
307
  if (code != 0) {
M
Minghao Li 已提交
308 309 310 311 312 313
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
M
Minghao Li 已提交
314
    ASSERT(0);
M
Minghao Li 已提交
315 316
  }
  // assert(code == 0);
M
Minghao Li 已提交
317

M
Minghao Li 已提交
318
  walFsync(pWal, true);
319 320

  sTrace("sync event old write wal: %ld", pEntry->index);
M
Minghao Li 已提交
321
  return code;
M
Minghao Li 已提交
322
}
M
Minghao Li 已提交
323

M
Minghao Li 已提交
324 325 326 327
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
328 329
  if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
M
Minghao Li 已提交
330 331 332
    ASSERT(pWalHandle != NULL);

    int32_t code = walReadWithHandle(pWalHandle, index);
M
Minghao Li 已提交
333
    if (code != 0) {
M
Minghao Li 已提交
334 335 336 337 338 339
      int32_t     err = terrno;
      const char* errStr = tstrerror(err);
      int32_t     linuxErr = errno;
      const char* linuxErrMsg = strerror(errno);
      sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
             linuxErrMsg);
M
Minghao Li 已提交
340
      ASSERT(0);
M
Minghao Li 已提交
341 342
    }
    // assert(walReadWithHandle(pWalHandle, index) == 0);
M
Minghao Li 已提交
343

L
Liu Jicong 已提交
344
    SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
345 346
    assert(pEntry != NULL);

347
    pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
M
Minghao Li 已提交
348 349 350 351 352
    pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
    pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
    pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
    pEntry->term = pWalHandle->pHead->head.syncMeta.term;
    pEntry->index = index;
L
Liu Jicong 已提交
353 354
    assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
    memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
355

M
Minghao Li 已提交
356 357
    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);
M
Minghao Li 已提交
358
    return pEntry;
M
Minghao Li 已提交
359

M
Minghao Li 已提交
360 361 362
  } else {
    return NULL;
  }
M
Minghao Li 已提交
363
}
M
Minghao Li 已提交
364

M
Minghao Li 已提交
365 366 367
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
368
  // assert(walRollback(pWal, fromIndex) == 0);
M
Minghao Li 已提交
369 370
  int32_t code = walRollback(pWal, fromIndex);
  if (code != 0) {
M
Minghao Li 已提交
371 372 373 374 375 376
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
M
Minghao Li 已提交
377
    ASSERT(0);
M
Minghao Li 已提交
378
  }
M
Minghao Li 已提交
379
  return 0;
M
Minghao Li 已提交
380
}
M
Minghao Li 已提交
381

M
Minghao Li 已提交
382
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
383 384
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
385
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
386 387
  return lastIndex;
}
M
Minghao Li 已提交
388

M
Minghao Li 已提交
389
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
390
  SyncTerm        lastTerm = 0;
M
Minghao Li 已提交
391
  SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
M
Minghao Li 已提交
392 393
  if (pLastEntry != NULL) {
    lastTerm = pLastEntry->term;
wafwerar's avatar
wafwerar 已提交
394
    taosMemoryFree(pLastEntry);
M
Minghao Li 已提交
395
  }
M
Minghao Li 已提交
396 397
  return lastTerm;
}
M
Minghao Li 已提交
398

M
Minghao Li 已提交
399 400 401
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
402
  // assert(walCommit(pWal, index) == 0);
M
Minghao Li 已提交
403 404
  int32_t code = walCommit(pWal, index);
  if (code != 0) {
M
Minghao Li 已提交
405 406 407 408
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
409
    sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
M
Minghao Li 已提交
410
    ASSERT(0);
M
Minghao Li 已提交
411
  }
M
Minghao Li 已提交
412
  return 0;
M
Minghao Li 已提交
413
}
M
Minghao Li 已提交
414

M
Minghao Li 已提交
415 416 417 418 419 420 421 422 423
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
424 425 426 427 428

  SSyncRaftEntry* pEntry = NULL;
  if (lastIndex > 0) {
    pEntry = logStoreGetEntry(pLogStore, lastIndex);
  }
M
Minghao Li 已提交
429 430
  return pEntry;
}
M
Minghao Li 已提交
431

M
Minghao Li 已提交
432
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
433
  char               u64buf[128] = {0};
M
Minghao Li 已提交
434 435
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
436 437 438 439 440 441 442

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);

M
Minghao Li 已提交
443 444
    snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
    cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
M
Minghao Li 已提交
445

M
Minghao Li 已提交
446 447 448
    SyncIndex endIndex = raftLogEndIndex(pLogStore);
    snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
    cJSON_AddStringToObject(pRoot, "endIndex", u64buf);
M
Minghao Li 已提交
449

M
Minghao Li 已提交
450 451
    int32_t count = raftLogEntryCount(pLogStore);
    cJSON_AddNumberToObject(pRoot, "entryCount", count);
M
Minghao Li 已提交
452

M
Minghao Li 已提交
453 454 455 456 457 458 459 460 461 462 463
    snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore));
    cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);

M
Minghao Li 已提交
464 465
    cJSON* pEntries = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
M
Minghao Li 已提交
466

M
Minghao Li 已提交
467
    for (SyncIndex i = pData->beginIndex; i <= endIndex; ++i) {
M
Minghao Li 已提交
468 469 470 471
      SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
      cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
      syncEntryDestory(pEntry);
    }
M
Minghao Li 已提交
472 473 474 475 476 477
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
  return pJson;
}
M
Minghao Li 已提交
478 479 480 481 482 483

char* logStore2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStore2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
484 485
}

M
Minghao Li 已提交
486
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
487
  char               u64buf[128] = {0};
M
Minghao Li 已提交
488 489 490 491 492 493 494 495
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
M
Minghao Li 已提交
496 497 498

    snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
    cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
M
Minghao Li 已提交
499 500 501 502 503 504 505

    SyncIndex endIndex = raftLogEndIndex(pLogStore);
    snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
    cJSON_AddStringToObject(pRoot, "endIndex", u64buf);

    int32_t count = raftLogEntryCount(pLogStore);
    cJSON_AddNumberToObject(pRoot, "entryCount", count);
M
Minghao Li 已提交
506 507 508 509 510 511 512 513 514 515 516

    snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore));
    cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf);

    snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
M
Minghao Li 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot);
  return pJson;
}

char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStoreSimple2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
531 532 533 534 535 536
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walGetFirstVer(pWal);
}

M
Minghao Li 已提交
537
// for debug -----------------
M
Minghao Li 已提交
538
void logStorePrint(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
539 540 541
  char* serialized = logStore2Str(pLogStore);
  printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
542
  taosMemoryFree(serialized);
M
Minghao Li 已提交
543 544 545 546
}

void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
547
  printf("logStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
548
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
549
  taosMemoryFree(serialized);
M
Minghao Li 已提交
550
}
M
Minghao Li 已提交
551

M
Minghao Li 已提交
552 553
void logStoreLog(SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
554
  sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
555
  taosMemoryFree(serialized);
M
Minghao Li 已提交
556 557 558 559
}

void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
560
  sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
561
  taosMemoryFree(serialized);
562
}
M
Minghao Li 已提交
563 564 565 566 567 568

// for debug -----------------
void logStoreSimplePrint(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
569
  taosMemoryFree(serialized);
M
Minghao Li 已提交
570 571 572 573 574 575
}

void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
576
  taosMemoryFree(serialized);
M
Minghao Li 已提交
577 578 579 580 581
}

void logStoreSimpleLog(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
582
  taosMemoryFree(serialized);
M
Minghao Li 已提交
583 584 585
}

void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
586 587 588 589 590
  if (gRaftDetailLog) {
    char* serialized = logStoreSimple2Str(pLogStore);
    sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
    taosMemoryFree(serialized);
  }
L
Liu Jicong 已提交
591
}