syncRaftLog.c 8.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncRaftLog.h"
M
Minghao Li 已提交
17
#include "wal.h"
M
Minghao Li 已提交
18

M
Minghao Li 已提交
19
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
wafwerar's avatar
wafwerar 已提交
20
  SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
M
Minghao Li 已提交
21
  assert(pLogStore != NULL);
M
Minghao Li 已提交
22

wafwerar's avatar
wafwerar 已提交
23
  pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
M
Minghao Li 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36
  assert(pLogStore->data != NULL);

  SSyncLogStoreData* pData = pLogStore->data;
  pData->pSyncNode = pSyncNode;
  pData->pWal = pSyncNode->pWal;

  pLogStore->appendEntry = logStoreAppendEntry;
  pLogStore->getEntry = logStoreGetEntry;
  pLogStore->truncate = logStoreTruncate;
  pLogStore->getLastIndex = logStoreLastIndex;
  pLogStore->getLastTerm = logStoreLastTerm;
  pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
  pLogStore->getCommitIndex = logStoreGetCommitIndex;
M
Minghao Li 已提交
37
  return pLogStore;
M
Minghao Li 已提交
38 39 40 41
}

void logStoreDestory(SSyncLogStore* pLogStore) {
  if (pLogStore != NULL) {
wafwerar's avatar
wafwerar 已提交
42 43
    taosMemoryFree(pLogStore->data);
    taosMemoryFree(pLogStore);
M
Minghao Li 已提交
44 45 46
  }
}

M
Minghao Li 已提交
47 48 49 50
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
51 52
  SyncIndex lastIndex = logStoreLastIndex(pLogStore);
  assert(pEntry->index == lastIndex + 1);
M
Minghao Li 已提交
53

M
Minghao Li 已提交
54 55 56 57 58 59 60
  int          code = 0;
  SSyncLogMeta syncMeta;
  syncMeta.isWeek = pEntry->isWeak;
  syncMeta.seqNum = pEntry->seqNum;
  syncMeta.term = pEntry->term;
  code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
  assert(code == 0);
M
Minghao Li 已提交
61

M
Minghao Li 已提交
62
  walFsync(pWal, true);
M
Minghao Li 已提交
63
  return code;
M
Minghao Li 已提交
64
}
M
Minghao Li 已提交
65

M
Minghao Li 已提交
66 67 68 69
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;

M
Minghao Li 已提交
70 71
  if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
    SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
M
Minghao Li 已提交
72
    assert(walReadWithHandle(pWalHandle, index) == 0);
M
Minghao Li 已提交
73

L
Liu Jicong 已提交
74
    SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
75 76
    assert(pEntry != NULL);

M
Minghao Li 已提交
77 78 79 80 81 82
    pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
    pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
    pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
    pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
    pEntry->term = pWalHandle->pHead->head.syncMeta.term;
    pEntry->index = index;
L
Liu Jicong 已提交
83 84
    assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
    memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
M
Minghao Li 已提交
85

M
Minghao Li 已提交
86 87
    // need to hold, do not new every time!!
    walCloseReadHandle(pWalHandle);
M
Minghao Li 已提交
88
    return pEntry;
M
Minghao Li 已提交
89

M
Minghao Li 已提交
90 91 92
  } else {
    return NULL;
  }
M
Minghao Li 已提交
93
}
M
Minghao Li 已提交
94

M
Minghao Li 已提交
95 96 97
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
98
  assert(walRollback(pWal, fromIndex) == 0);
M
Minghao Li 已提交
99
  return 0;  // to avoid compiler error
M
Minghao Li 已提交
100
}
M
Minghao Li 已提交
101

M
Minghao Li 已提交
102
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
103 104
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
105
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
106 107
  return lastIndex;
}
M
Minghao Li 已提交
108

M
Minghao Li 已提交
109
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
110
  SyncTerm        lastTerm = 0;
M
Minghao Li 已提交
111
  SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
M
Minghao Li 已提交
112 113
  if (pLastEntry != NULL) {
    lastTerm = pLastEntry->term;
wafwerar's avatar
wafwerar 已提交
114
    taosMemoryFree(pLastEntry);
M
Minghao Li 已提交
115
  }
M
Minghao Li 已提交
116 117
  return lastTerm;
}
M
Minghao Li 已提交
118

M
Minghao Li 已提交
119 120 121
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
M
Minghao Li 已提交
122
  assert(walCommit(pWal, index) == 0);
M
Minghao Li 已提交
123
  return 0;  // to avoid compiler error
M
Minghao Li 已提交
124
}
M
Minghao Li 已提交
125

M
Minghao Li 已提交
126 127 128 129 130 131 132 133 134
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  return pData->pSyncNode->commitIndex;
}

SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
  SSyncLogStoreData* pData = pLogStore->data;
  SWal*              pWal = pData->pWal;
  SyncIndex          lastIndex = walGetLastVer(pWal);
M
Minghao Li 已提交
135 136 137 138 139

  SSyncRaftEntry* pEntry = NULL;
  if (lastIndex > 0) {
    pEntry = logStoreGetEntry(pLogStore, lastIndex);
  }
M
Minghao Li 已提交
140 141
  return pEntry;
}
M
Minghao Li 已提交
142

M
Minghao Li 已提交
143
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
144
  char               u64buf[128];
M
Minghao Li 已提交
145 146
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);

    cJSON* pEntries = cJSON_CreateArray();
    cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
    SyncIndex lastIndex = logStoreLastIndex(pLogStore);
    for (SyncIndex i = 0; i <= lastIndex; ++i) {
      SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
      cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
      syncEntryDestory(pEntry);
    }
M
Minghao Li 已提交
166 167 168 169 170 171
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
  return pJson;
}
M
Minghao Li 已提交
172 173 174 175 176 177

char* logStore2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStore2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
178 179
}

M
Minghao Li 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
  char               u64buf[128];
  SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
  cJSON*             pRoot = cJSON_CreateObject();

  if (pData != NULL && pData->pWal != NULL) {
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
    cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
    cJSON_AddStringToObject(pRoot, "pWal", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
    cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
  }

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot);
  return pJson;
}

char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
  cJSON* pJson = logStoreSimple2Json(pLogStore);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
}

M
Minghao Li 已提交
208
// for debug -----------------
M
Minghao Li 已提交
209
void logStorePrint(SSyncLogStore* pLogStore) {
M
Minghao Li 已提交
210 211 212
  char* serialized = logStore2Str(pLogStore);
  printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
213
  taosMemoryFree(serialized);
M
Minghao Li 已提交
214 215 216 217
}

void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
218
  printf("logStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
219
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
220
  taosMemoryFree(serialized);
M
Minghao Li 已提交
221
}
M
Minghao Li 已提交
222

M
Minghao Li 已提交
223 224
void logStoreLog(SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
225
  sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
226
  taosMemoryFree(serialized);
M
Minghao Li 已提交
227 228 229 230
}

void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStore2Str(pLogStore);
M
Minghao Li 已提交
231
  sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
232
  taosMemoryFree(serialized);
233
}
M
Minghao Li 已提交
234 235 236 237 238 239

// for debug -----------------
void logStoreSimplePrint(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
240
  taosMemoryFree(serialized);
M
Minghao Li 已提交
241 242 243 244 245 246
}

void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
247
  taosMemoryFree(serialized);
M
Minghao Li 已提交
248 249 250 251 252
}

void logStoreSimpleLog(SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
253
  taosMemoryFree(serialized);
M
Minghao Li 已提交
254 255 256 257 258
}

void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
  char* serialized = logStoreSimple2Str(pLogStore);
  sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
259
  taosMemoryFree(serialized);
L
Liu Jicong 已提交
260
}