syncRaftLog.c 6.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftLog.h"
M
Minghao Li 已提交
17
#include "wal.h"
M
Minghao Li 已提交
18

M
Minghao Li 已提交
19 20 21
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
  SSyncLogStore* pLogStore = malloc(sizeof(SSyncLogStore));
  assert(pLogStore != NULL);
M
Minghao Li 已提交
22

M
Minghao Li 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  pLogStore->data = malloc(sizeof(SSyncLogStoreData));
  assert(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;

  pLogStore->appendEntry = logStoreAppendEntry;
  pLogStore->getEntry = logStoreGetEntry;
  pLogStore->truncate = logStoreTruncate;
  pLogStore->getLastIndex = logStoreLastIndex;
  pLogStore->getLastTerm = logStoreLastTerm;
  pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
  pLogStore->getCommitIndex = logStoreGetCommitIndex;
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
    free(pLogStore->data);
    free(pLogStore);
  }
}

M
Minghao Li 已提交
46 47 48 49
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
50 51 52 53 54
  assert(pEntry->index == logStoreLastIndex(pLogStore) + 1);
  uint32_t len;
  char*    serialized = syncEntrySerialize(pEntry, &len);
  assert(serialized != NULL);

M
Minghao Li 已提交
55 56 57
  int code;
  code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len);
  assert(code == 0);
M
Minghao Li 已提交
58

M
Minghao Li 已提交
59
  walFsync(pWal, true);
M
Minghao Li 已提交
60
  free(serialized);
M
Minghao Li 已提交
61
}
M
Minghao Li 已提交
62

M
Minghao Li 已提交
63 64 65
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
66
  SSyncRaftEntry*    pEntry = NULL;
M
Minghao Li 已提交
67

M
Minghao Li 已提交
68 69 70 71 72 73 74 75 76
  if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
    walReadWithHandle(pWalHandle, index);
    pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
    assert(pEntry != NULL);

    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);
  }
M
Minghao Li 已提交
77 78 79

  return pEntry;
}
M
Minghao Li 已提交
80

M
Minghao Li 已提交
81 82 83 84 85
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  walRollback(pWal, fromIndex);
}
M
Minghao Li 已提交
86

M
Minghao Li 已提交
87
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
88 89
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
90
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
91 92
  return lastIndex;
}
M
Minghao Li 已提交
93

M
Minghao Li 已提交
94
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
95
  SyncTerm        lastTerm = 0;
M
Minghao Li 已提交
96
  SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
M
Minghao Li 已提交
97 98 99 100
  if (pLastEntry != NULL) {
    lastTerm = pLastEntry->term;
    free(pLastEntry);
  }
M
Minghao Li 已提交
101 102
  return lastTerm;
}
M
Minghao Li 已提交
103

M
Minghao Li 已提交
104 105 106 107 108
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  walCommit(pWal, index);
}
M
Minghao Li 已提交
109

M
Minghao Li 已提交
110 111 112 113 114 115 116 117 118
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
119 120 121 122 123

  SSyncRaftEntry* pEntry = NULL;
  if (lastIndex > 0) {
    pEntry = logStoreGetEntry(pLogStore, lastIndex);
  }
M
Minghao Li 已提交
124 125
  return pEntry;
}
M
Minghao Li 已提交
126

M
Minghao Li 已提交
127
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
128
  char               u64buf[128];
M
Minghao Li 已提交
129 130
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);

    cJSON* pEntries = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
    SyncIndex lastIndex = logStoreLastIndex(pLogStore);
    for (SyncIndex i = 0; i <= lastIndex; ++i) {
      SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
      cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
      syncEntryDestory(pEntry);
    }
M
Minghao Li 已提交
150 151 152 153 154 155
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
  return pJson;
}
M
Minghao Li 已提交
156 157 158 159 160 161

char* logStore2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStore2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
162 163
}

M
Minghao Li 已提交
164
// for debug -----------------
M
Minghao Li 已提交
165
void logStorePrint(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
166 167 168 169 170 171 172 173 174 175 176 177
  char* serialized = logStore2Str(pLogStore);
  printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
  free(serialized);
}

void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
  printf("logStorePrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
  free(serialized);
}
M
Minghao Li 已提交
178

M
Minghao Li 已提交
179 180 181 182 183 184 185 186 187 188
void logStoreLog(SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
  sTrace("logStorePrint | len:%lu | %s", strlen(serialized), serialized);
  free(serialized);
}

void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
  sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized);
  free(serialized);
M
Minghao Li 已提交
189
}