syncRaftLog.c 19.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftLog.h"
M
Minghao Li 已提交
17
#include "wal.h"
M
Minghao Li 已提交
18

M
Minghao Li 已提交
19 20 21 22
// refactor, log[0 .. n] ==> log[m .. n]
static int32_t   raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
23
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35
static bool      raftLogIsEmpty(struct SSyncLogStore* pLogStore);
static int32_t   raftLogEntryCount(struct SSyncLogStore* pLogStore);
static bool      raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
static SyncTerm  raftLogLastTerm(struct SSyncLogStore* pLogStore);
static int32_t   raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t   raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
static int32_t   raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);

static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);

//-------------------------------
36
static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
M
Minghao Li 已提交
37 38 39 40 41 42 43
static SyncIndex       logStoreLastIndex(SSyncLogStore* pLogStore);
static SyncTerm        logStoreLastTerm(SSyncLogStore* pLogStore);
static SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
static int32_t         logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t         logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
static int32_t         logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex       logStoreGetCommitIndex(SSyncLogStore* pLogStore);
44

M
Minghao Li 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// refactor, log[0 .. n] ==> log[m .. n]
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
  // if beginIndex == 0, donot need call this funciton
  ASSERT(beginIndex > 0);

  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  pData->beginIndex = beginIndex;
  walRestoreFromSnapshot(pWal, beginIndex - 1);
  return 0;
}

static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return pData->beginIndex;
}

static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); }

static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
  return (endIndex >= beginIndex);
}

static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
M
Minghao Li 已提交
74
  int32_t   count = endIndex - beginIndex + 1;
M
Minghao Li 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88
  return count > 0 ? count : 0;
}

static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
  SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
  SyncIndex endIndex = raftLogEndIndex(pLogStore);
  if (index >= beginIndex && index <= endIndex) {
    return true;
  } else {
    return false;
  }
}

static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
89
  SyncIndex          lastIndex;
M
Minghao Li 已提交
90 91 92
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
M
Minghao Li 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
  SyncIndex          firstVer = walGetFirstVer(pWal);

  if (lastVer < firstVer) {
    // no record
    lastIndex = -1;

  } else {
    if (firstVer >= 0) {
      lastIndex = lastVer;
    } else if (firstVer == -1) {
      lastIndex = -1;
    } else {
      ASSERT(0);
    }
  }

  return lastIndex;
}

static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastVer = walGetLastVer(pWal);
  return lastVer + 1;
M
Minghao Li 已提交
117 118 119 120
}

static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
  SyncTerm lastTerm = 0;
M
Minghao Li 已提交
121
  if (raftLogEntryCount(pLogStore) == 0) {
M
Minghao Li 已提交
122 123 124 125 126
    lastTerm = 0;
  } else {
    SSyncRaftEntry* pLastEntry;
    int32_t         code = raftLogGetLastEntry(pLogStore, &pLastEntry);
    ASSERT(code == 0);
M
Minghao Li 已提交
127 128 129 130
    if (pLastEntry != NULL) {
      lastTerm = pLastEntry->term;
      taosMemoryFree(pLastEntry);
    }
M
Minghao Li 已提交
131 132 133 134 135 136 137 138
  }
  return lastTerm;
}

static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
139 140
  SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
  ASSERT(pEntry->index == writeIndex);
M
Minghao Li 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158

  int          code = 0;
  SSyncLogMeta syncMeta;
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
  code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("raftLogAppendEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
    ASSERT(0);
  }

  walFsync(pWal, true);
159 160 161

  sTrace("sync event write wal: %ld", pEntry->index);

M
Minghao Li 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
  return code;
}

static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  int32_t            code;

  *ppEntry = NULL;
  if (raftLogInRange(pLogStore, index)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
    ASSERT(pWalHandle != NULL);

    code = walReadWithHandle(pWalHandle, index);
    if (code != 0) {
      int32_t     err = terrno;
      const char* errStr = tstrerror(err);
      int32_t     linuxErr = errno;
      const char* linuxErrMsg = strerror(errno);
      sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
             linuxErrMsg);
      walCloseReadHandle(pWalHandle);
      ASSERT(0);
      return code;
    }

    *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
    ASSERT(*ppEntry != NULL);
190
    (*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
M
Minghao Li 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203
    (*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
    (*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
    (*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
    (*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
    (*ppEntry)->index = index;
    ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
    memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);

    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);

  } else {
    // index not in range
M
Minghao Li 已提交
204
    code = 0;
M
Minghao Li 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
  }

  return code;
}

static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  int32_t            code = walRollback(pWal, fromIndex);
  if (code != 0) {
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("raftLogTruncate error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
    ASSERT(0);
  }
  return code;
}

static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
M
Minghao Li 已提交
227
  *ppLastEntry = NULL;
M
Minghao Li 已提交
228
  if (raftLogEntryCount(pLogStore) == 0) {
M
Minghao Li 已提交
229
    return 0;
M
Minghao Li 已提交
230 231 232 233 234 235 236
  }
  SyncIndex lastIndex = raftLogLastIndex(pLogStore);
  int32_t   code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
  return code;
}

//-------------------------------
M
Minghao Li 已提交
237
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
wafwerar's avatar
wafwerar 已提交
238
  SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
M
Minghao Li 已提交
239
  assert(pLogStore != NULL);
M
Minghao Li 已提交
240

wafwerar's avatar
wafwerar 已提交
241
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
M
Minghao Li 已提交
242 243 244 245 246 247
  assert(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;

M
Minghao Li 已提交
248 249 250 251 252 253 254 255 256 257
  SyncIndex firstVer = walGetFirstVer(pData->pWal);
  SyncIndex lastVer = walGetLastVer(pData->pWal);
  if (firstVer >= 0) {
    pData->beginIndex = firstVer;
  } else if (firstVer == -1) {
    pData->beginIndex = lastVer + 1;
  } else {
    ASSERT(0);
  }

M
Minghao Li 已提交
258 259 260 261 262 263 264
  pLogStore->appendEntry = logStoreAppendEntry;
  pLogStore->getEntry = logStoreGetEntry;
  pLogStore->truncate = logStoreTruncate;
  pLogStore->getLastIndex = logStoreLastIndex;
  pLogStore->getLastTerm = logStoreLastTerm;
  pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
  pLogStore->getCommitIndex = logStoreGetCommitIndex;
M
Minghao Li 已提交
265 266 267 268 269 270 271 272 273 274 275 276

  pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
  pLogStore->syncLogBeginIndex = raftLogBeginIndex;
  pLogStore->syncLogEndIndex = raftLogEndIndex;
  pLogStore->syncLogIsEmpty = raftLogIsEmpty;
  pLogStore->syncLogEntryCount = raftLogEntryCount;
  pLogStore->syncLogInRange = raftLogInRange;
  pLogStore->syncLogLastIndex = raftLogLastIndex;
  pLogStore->syncLogLastTerm = raftLogLastTerm;
  pLogStore->syncLogAppendEntry = raftLogAppendEntry;
  pLogStore->syncLogGetEntry = raftLogGetEntry;
  pLogStore->syncLogTruncate = raftLogTruncate;
M
Minghao Li 已提交
277
  pLogStore->syncLogWriteIndex = raftLogWriteIndex;
M
Minghao Li 已提交
278

M
Minghao Li 已提交
279
  return pLogStore;
M
Minghao Li 已提交
280 281 282 283
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
wafwerar's avatar
wafwerar 已提交
284 285
    taosMemoryFree(pLogStore->data);
    taosMemoryFree(pLogStore);
M
Minghao Li 已提交
286 287 288
  }
}

M
Minghao Li 已提交
289
//-------------------------------
M
Minghao Li 已提交
290 291 292 293
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
294 295
  SyncIndex lastIndex = logStoreLastIndex(pLogStore);
  assert(pEntry->index == lastIndex + 1);
M
Minghao Li 已提交
296

M
Minghao Li 已提交
297 298 299 300 301 302
  int          code = 0;
  SSyncLogMeta syncMeta;
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
  code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
M
Minghao Li 已提交
303
  if (code != 0) {
M
Minghao Li 已提交
304 305 306 307 308 309
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
M
Minghao Li 已提交
310
    ASSERT(0);
M
Minghao Li 已提交
311 312
  }
  // assert(code == 0);
M
Minghao Li 已提交
313

M
Minghao Li 已提交
314
  walFsync(pWal, true);
315 316

  sTrace("sync event old write wal: %ld", pEntry->index);
M
Minghao Li 已提交
317
  return code;
M
Minghao Li 已提交
318
}
M
Minghao Li 已提交
319

M
Minghao Li 已提交
320 321 322 323
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
324 325
  if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
M
Minghao Li 已提交
326 327 328
    ASSERT(pWalHandle != NULL);

    int32_t code = walReadWithHandle(pWalHandle, index);
M
Minghao Li 已提交
329
    if (code != 0) {
M
Minghao Li 已提交
330 331 332 333 334 335
      int32_t     err = terrno;
      const char* errStr = tstrerror(err);
      int32_t     linuxErr = errno;
      const char* linuxErrMsg = strerror(errno);
      sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
             linuxErrMsg);
M
Minghao Li 已提交
336
      ASSERT(0);
M
Minghao Li 已提交
337 338
    }
    // assert(walReadWithHandle(pWalHandle, index) == 0);
M
Minghao Li 已提交
339

L
Liu Jicong 已提交
340
    SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
341 342
    assert(pEntry != NULL);

343
    pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
M
Minghao Li 已提交
344 345 346 347 348
    pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
    pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
    pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
    pEntry->term = pWalHandle->pHead->head.syncMeta.term;
    pEntry->index = index;
L
Liu Jicong 已提交
349 350
    assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
    memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
351

M
Minghao Li 已提交
352 353
    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);
M
Minghao Li 已提交
354
    return pEntry;
M
Minghao Li 已提交
355

M
Minghao Li 已提交
356 357 358
  } else {
    return NULL;
  }
M
Minghao Li 已提交
359
}
M
Minghao Li 已提交
360

M
Minghao Li 已提交
361 362 363
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
364
  // assert(walRollback(pWal, fromIndex) == 0);
M
Minghao Li 已提交
365 366
  int32_t code = walRollback(pWal, fromIndex);
  if (code != 0) {
M
Minghao Li 已提交
367 368 369 370 371 372
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
    sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
           linuxErrMsg);
M
Minghao Li 已提交
373
    ASSERT(0);
M
Minghao Li 已提交
374
  }
M
Minghao Li 已提交
375
  return 0;
M
Minghao Li 已提交
376
}
M
Minghao Li 已提交
377

M
Minghao Li 已提交
378
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
379 380
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
381
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
382 383
  return lastIndex;
}
M
Minghao Li 已提交
384

M
Minghao Li 已提交
385
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
386
  SyncTerm        lastTerm = 0;
M
Minghao Li 已提交
387
  SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
M
Minghao Li 已提交
388 389
  if (pLastEntry != NULL) {
    lastTerm = pLastEntry->term;
wafwerar's avatar
wafwerar 已提交
390
    taosMemoryFree(pLastEntry);
M
Minghao Li 已提交
391
  }
M
Minghao Li 已提交
392 393
  return lastTerm;
}
M
Minghao Li 已提交
394

M
Minghao Li 已提交
395 396 397
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
398
  // assert(walCommit(pWal, index) == 0);
M
Minghao Li 已提交
399 400
  int32_t code = walCommit(pWal, index);
  if (code != 0) {
M
Minghao Li 已提交
401 402 403 404
    int32_t     err = terrno;
    const char* errStr = tstrerror(err);
    int32_t     linuxErr = errno;
    const char* linuxErrMsg = strerror(errno);
405
    sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
M
Minghao Li 已提交
406
    ASSERT(0);
M
Minghao Li 已提交
407
  }
M
Minghao Li 已提交
408
  return 0;
M
Minghao Li 已提交
409
}
M
Minghao Li 已提交
410

M
Minghao Li 已提交
411 412 413 414 415 416 417 418 419
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
420 421 422 423 424

  SSyncRaftEntry* pEntry = NULL;
  if (lastIndex > 0) {
    pEntry = logStoreGetEntry(pLogStore, lastIndex);
  }
M
Minghao Li 已提交
425 426
  return pEntry;
}
M
Minghao Li 已提交
427

M
Minghao Li 已提交
428
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
429
  char               u64buf[128] = {0};
M
Minghao Li 已提交
430 431
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
432 433 434 435 436 437

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
M
Minghao Li 已提交
438
    snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
M
Minghao Li 已提交
439
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
M
Minghao Li 已提交
440
    snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
M
Minghao Li 已提交
441 442
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);

M
Minghao Li 已提交
443 444
    snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
    cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
M
Minghao Li 已提交
445

M
Minghao Li 已提交
446 447 448
    SyncIndex endIndex = raftLogEndIndex(pLogStore);
    snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
    cJSON_AddStringToObject(pRoot, "endIndex", u64buf);
M
Minghao Li 已提交
449

M
Minghao Li 已提交
450 451
    int32_t count = raftLogEntryCount(pLogStore);
    cJSON_AddNumberToObject(pRoot, "entryCount", count);
M
Minghao Li 已提交
452

M
Minghao Li 已提交
453 454
    cJSON* pEntries = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
M
Minghao Li 已提交
455

M
Minghao Li 已提交
456
    for (SyncIndex i = pData->beginIndex; i <= endIndex; ++i) {
M
Minghao Li 已提交
457 458 459 460
      SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
      cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
      syncEntryDestory(pEntry);
    }
M
Minghao Li 已提交
461 462 463 464 465 466
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
  return pJson;
}
M
Minghao Li 已提交
467 468 469 470 471 472

char* logStore2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStore2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
473 474
}

M
Minghao Li 已提交
475
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
476
  char               u64buf[128] = {0};
M
Minghao Li 已提交
477 478 479 480 481 482 483 484
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
M
Minghao Li 已提交
485
    snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
M
Minghao Li 已提交
486
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
M
Minghao Li 已提交
487
    snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
M
Minghao Li 已提交
488
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
M
Minghao Li 已提交
489 490 491

    snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
    cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
M
Minghao Li 已提交
492 493 494 495 496 497 498

    SyncIndex endIndex = raftLogEndIndex(pLogStore);
    snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
    cJSON_AddStringToObject(pRoot, "endIndex", u64buf);

    int32_t count = raftLogEntryCount(pLogStore);
    cJSON_AddNumberToObject(pRoot, "entryCount", count);
M
Minghao Li 已提交
499 500 501 502 503 504 505 506 507 508 509 510 511 512
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot);
  return pJson;
}

char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStoreSimple2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
513 514 515 516 517 518
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  return walGetFirstVer(pWal);
}

M
Minghao Li 已提交
519
// for debug -----------------
M
Minghao Li 已提交
520
void logStorePrint(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
521 522 523
  char* serialized = logStore2Str(pLogStore);
  printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
524
  taosMemoryFree(serialized);
M
Minghao Li 已提交
525 526 527 528
}

void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
529
  printf("logStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
530
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
531
  taosMemoryFree(serialized);
M
Minghao Li 已提交
532
}
M
Minghao Li 已提交
533

M
Minghao Li 已提交
534 535
void logStoreLog(SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
536
  sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
537
  taosMemoryFree(serialized);
M
Minghao Li 已提交
538 539 540 541
}

void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
542
  sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
543
  taosMemoryFree(serialized);
544
}
M
Minghao Li 已提交
545 546 547 548 549 550

// for debug -----------------
void logStoreSimplePrint(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
551
  taosMemoryFree(serialized);
M
Minghao Li 已提交
552 553 554 555 556 557
}

void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
558
  taosMemoryFree(serialized);
M
Minghao Li 已提交
559 560 561 562 563
}

void logStoreSimpleLog(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
564
  taosMemoryFree(serialized);
M
Minghao Li 已提交
565 566 567 568 569
}

void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
570
  taosMemoryFree(serialized);
L
Liu Jicong 已提交
571
}