syncRaftEntry.c 13.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftEntry.h"
M
Minghao Li 已提交
18
#include "syncUtil.h"
S
Shengliang Guan 已提交
19
#include "tref.h"
M
Minghao Li 已提交
20

21 22
SSyncRaftEntry* syncEntryBuild(int32_t dataLen) {
  int32_t         bytes = sizeof(SSyncRaftEntry) + dataLen;
wafwerar's avatar
wafwerar 已提交
23
  SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes);
24 25 26 27 28
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

M
Minghao Li 已提交
29
  pEntry->bytes = bytes;
M
Minghao Li 已提交
30
  pEntry->dataLen = dataLen;
31
  pEntry->rid = -1;
M
Minghao Li 已提交
32 33 34 35

  return pEntry;
}

36
SSyncRaftEntry* syncEntryBuildFromClientRequest(const SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
M
Minghao Li 已提交
37
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
38
  if (pEntry == NULL) return NULL;
M
Minghao Li 已提交
39

M
Minghao Li 已提交
40 41 42 43 44 45 46 47 48 49 50
  pEntry->msgType = pMsg->msgType;
  pEntry->originalRpcType = pMsg->originalRpcType;
  pEntry->seqNum = pMsg->seqNum;
  pEntry->isWeak = pMsg->isWeak;
  pEntry->term = term;
  pEntry->index = index;
  memcpy(pEntry->data, pMsg->data, pMsg->dataLen);

  return pEntry;
}

51 52 53
SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, SyncIndex index) {
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->contLen);
  if (pEntry == NULL) return NULL;
54 55

  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
56
  pEntry->originalRpcType = pMsg->msgType;
57 58 59 60
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
  pEntry->term = term;
  pEntry->index = index;
61 62 63 64 65 66 67 68
  memcpy(pEntry->data, pMsg->pCont, pMsg->contLen);

  return pEntry;
}

SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
  if (pEntry == NULL) return NULL;
69

70
  memcpy(pEntry, pMsg->data, pMsg->dataLen);
71 72 73
  return pEntry;
}

M
Minghao Li 已提交
74
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
75 76
  SSyncRaftEntry* pEntry = syncEntryBuild(sizeof(SMsgHead));
  if (pEntry == NULL) return NULL;
M
Minghao Li 已提交
77

78 79
  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pEntry->originalRpcType = TDMT_SYNC_NOOP;
M
Minghao Li 已提交
80 81
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
M
Minghao Li 已提交
82 83
  pEntry->term = term;
  pEntry->index = index;
M
Minghao Li 已提交
84

85 86 87
  SMsgHead* pHead = (SMsgHead*)pEntry->data;
  pHead->vgId = vgId;
  pHead->contLen = sizeof(SMsgHead);
M
Minghao Li 已提交
88 89 90 91

  return pEntry;
}

M
Minghao Li 已提交
92 93
void syncEntryDestory(SSyncRaftEntry* pEntry) {
  if (pEntry != NULL) {
wafwerar's avatar
wafwerar 已提交
94
    taosMemoryFree(pEntry);
M
Minghao Li 已提交
95 96 97
  }
}

M
Minghao Li 已提交
98 99 100 101 102 103 104
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
  pRpcMsg->msgType = pEntry->originalRpcType;
  pRpcMsg->contLen = pEntry->dataLen;
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
}

105 106
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
  SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
M
Minghao Li 已提交
107
  if (pCache == NULL) {
S
Shengliang Guan 已提交
108
    sError("vgId:%d, raft cache create error", pSyncNode->vgId);
M
Minghao Li 已提交
109 110 111 112 113 114
    return NULL;
  }

  pCache->pEntryHash =
      taosHashInit(sizeof(SyncIndex), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  if (pCache->pEntryHash == NULL) {
S
Shengliang Guan 已提交
115
    sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
M
Minghao Li 已提交
116 117 118 119 120 121 122 123 124 125 126
    return NULL;
  }

  taosThreadMutexInit(&(pCache->mutex), NULL);
  pCache->maxCount = maxCount;
  pCache->currentCount = 0;
  pCache->pSyncNode = pSyncNode;

  return pCache;
}

127
void raftCacheDestroy(SRaftEntryHashCache* pCache) {
M
Minghao Li 已提交
128
  if (pCache != NULL) {
S
Shengliang Guan 已提交
129
    taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
130
    taosHashCleanup(pCache->pEntryHash);
S
Shengliang Guan 已提交
131
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
132 133 134 135 136 137 138 139
    taosThreadMutexDestroy(&(pCache->mutex));
    taosMemoryFree(pCache);
  }
}

// success, return 1
// max count, return 0
// error, return -1
140
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry) {
S
Shengliang Guan 已提交
141
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
142 143

  if (pCache->currentCount >= pCache->maxCount) {
S
Shengliang Guan 已提交
144
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
145 146 147 148 149 150
    return 0;
  }

  taosHashPut(pCache->pEntryHash, &(pEntry->index), sizeof(pEntry->index), pEntry, pEntry->bytes);
  ++(pCache->currentCount);

S
Shengliang Guan 已提交
151 152 153 154
  sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
          TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
          pEntry->index, pEntry->bytes);
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
155 156 157 158 159 160
  return 1;
}

// success, return 0
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
161
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
162 163 164 165 166
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
167
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
168 169 170 171 172 173
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = taosMemoryMalloc(pEntry->bytes);
    memcpy(*ppEntry, pTmp, pEntry->bytes);

S
Shengliang Guan 已提交
174 175 176 177
    sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
178 179 180
    return 0;
  }

S
Shengliang Guan 已提交
181
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
182 183 184 185 186 187 188
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

// success, return 0
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
189
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
190 191 192 193 194
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
195
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
196 197 198 199 200
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = pEntry;

S
Shengliang Guan 已提交
201 202 203 204
    sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
205 206 207
    return 0;
  }

S
Shengliang Guan 已提交
208
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
209 210 211 212
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

213
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index) {
S
Shengliang Guan 已提交
214
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
215
  taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
M
Minghao Li 已提交
216
  --(pCache->currentCount);
S
Shengliang Guan 已提交
217
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
218 219 220
  return 0;
}

221
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
222 223 224 225 226
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
227
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
228 229 230 231 232 233
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = taosMemoryMalloc(pEntry->bytes);
    memcpy(*ppEntry, pTmp, pEntry->bytes);

S
Shengliang Guan 已提交
234 235 236
    sNTrace(pCache->pSyncNode, "raft cache get-and-del, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
M
Minghao Li 已提交
237 238

    taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
M
Minghao Li 已提交
239 240
    --(pCache->currentCount);

S
Shengliang Guan 已提交
241
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
242 243 244
    return 0;
  }

S
Shengliang Guan 已提交
245
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
246 247 248 249
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

250
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) {
S
Shengliang Guan 已提交
251
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
252
  taosHashClear(pCache->pEntryHash);
M
Minghao Li 已提交
253
  pCache->currentCount = 0;
S
Shengliang Guan 已提交
254
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
255 256 257
  return 0;
}

258 259 260 261 262 263 264
static char* keyFn(const void* pData) {
  SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
  return (char*)(&(pEntry->index));
}

static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }

265 266 267 268 269
static void freeRaftEntry(void* param) {
  SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
  syncEntryDestory(pEntry);
}

270 271 272
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
  SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
  if (pCache == NULL) {
S
Shengliang Guan 已提交
273
    sError("vgId:%d, raft cache create error", pSyncNode->vgId);
274 275 276 277 278 279
    return NULL;
  }

  pCache->pSkipList =
      tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
  if (pCache->pSkipList == NULL) {
S
Shengliang Guan 已提交
280
    sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
281 282 283 284
    return NULL;
  }

  taosThreadMutexInit(&(pCache->mutex), NULL);
285
  pCache->refMgr = taosOpenRef(10, freeRaftEntry);
286 287 288 289 290 291 292 293 294
  pCache->maxCount = maxCount;
  pCache->currentCount = 0;
  pCache->pSyncNode = pSyncNode;

  return pCache;
}

void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
  if (pCache != NULL) {
S
Shengliang Guan 已提交
295
    taosThreadMutexLock(&pCache->mutex);
296
    tSkipListDestroy(pCache->pSkipList);
297 298 299 300
    if (pCache->refMgr != -1) {
      taosCloseRef(pCache->refMgr);
      pCache->refMgr = -1;
    }
S
Shengliang Guan 已提交
301
    taosThreadMutexUnlock(&pCache->mutex);
302 303 304 305 306 307 308 309 310
    taosThreadMutexDestroy(&(pCache->mutex));
    taosMemoryFree(pCache);
  }
}

// success, return 1
// max count, return 0
// error, return -1
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
S
Shengliang Guan 已提交
311
  taosThreadMutexLock(&pCache->mutex);
312 313

  if (pCache->currentCount >= pCache->maxCount) {
S
Shengliang Guan 已提交
314
    taosThreadMutexUnlock(&pCache->mutex);
315 316 317 318 319 320 321
    return 0;
  }

  SSkipListNode* pSkipListNode = tSkipListPut(pCache->pSkipList, pEntry);
  ASSERT(pSkipListNode != NULL);
  ++(pCache->currentCount);

322 323 324
  pEntry->rid = taosAddRef(pCache->refMgr, pEntry);
  ASSERT(pEntry->rid >= 0);

S
Shengliang Guan 已提交
325 326 327 328
  sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
          TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
          pEntry->index, pEntry->bytes);
  taosThreadMutexUnlock(&pCache->mutex);
329 330 331 332 333 334 335 336 337 338 339
  return 1;
}

// find one, return 1
// not found, return 0
// error, return -1
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
  ASSERT(ppEntry != NULL);
  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
  if (code == 1) {
M
Minghao Li 已提交
340
    *ppEntry = taosMemoryMalloc((int64_t)(pEntry->bytes));
341
    memcpy(*ppEntry, pEntry, pEntry->bytes);
342
    (*ppEntry)->rid = -1;
343 344 345 346 347 348 349 350 351 352
  } else {
    *ppEntry = NULL;
  }
  return code;
}

// find one, return 1
// not found, return 0
// error, return -1
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
S
Shengliang Guan 已提交
353
  taosThreadMutexLock(&pCache->mutex);
354 355 356 357 358 359 360 361 362 363

  SyncIndex index2 = index;
  int32_t   code = 0;

  SArray* entryPArray = tSkipListGet(pCache->pSkipList, (char*)(&index2));
  int32_t arraySize = taosArrayGetSize(entryPArray);
  if (arraySize == 1) {
    SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
    ASSERT(*ppNode != NULL);
    *ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
364
    taosAcquireRef(pCache->refMgr, (*ppEntry)->rid);
365 366 367 368 369 370 371 372 373 374 375 376
    code = 1;

  } else if (arraySize == 0) {
    code = 0;

  } else {
    ASSERT(0);

    code = -1;
  }
  taosArrayDestroy(entryPArray);

S
Shengliang Guan 已提交
377
  taosThreadMutexUnlock(&pCache->mutex);
378 379 380 381 382 383 384 385
  return code;
}

// count = -1, clear all
// count >= 0, clear count
// return -1, error
// return delete count
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
S
Shengliang Guan 已提交
386
  taosThreadMutexLock(&pCache->mutex);
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
  int32_t returnCnt = 0;

  if (count == -1) {
    // clear all
    SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
    while (tSkipListIterNext(pIter)) {
      SSkipListNode* pNode = tSkipListIterGet(pIter);
      ASSERT(pNode != NULL);
      SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
      syncEntryDestory(pEntry);
      ++returnCnt;
    }
    tSkipListDestroyIter(pIter);

    tSkipListDestroy(pCache->pSkipList);
    pCache->pSkipList =
        tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
    ASSERT(pCache->pSkipList != NULL);

  } else {
    // clear count
    int                i = 0;
    SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
    SArray*            delNodeArray = taosArrayInit(0, sizeof(SSkipListNode*));

    // free entry
    while (tSkipListIterNext(pIter)) {
      SSkipListNode* pNode = tSkipListIterGet(pIter);
      ASSERT(pNode != NULL);
      if (i++ >= count) {
        break;
      }

      // sDebug("push pNode:%p", pNode);
      taosArrayPush(delNodeArray, &pNode);
      ++returnCnt;
      SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
424 425 426

      // syncEntryDestory(pEntry);
      taosRemoveRef(pCache->refMgr, pEntry->rid);
427 428 429 430 431 432 433 434 435 436 437 438 439 440
    }
    tSkipListDestroyIter(pIter);

    // delete skiplist node
    int32_t arraySize = taosArrayGetSize(delNodeArray);
    for (int32_t i = 0; i < arraySize; ++i) {
      SSkipListNode** ppNode = taosArrayGet(delNodeArray, i);
      // sDebug("get pNode:%p", *ppNode);
      tSkipListRemoveNode(pCache->pSkipList, *ppNode);
    }
    taosArrayDestroy(delNodeArray);
  }

  pCache->currentCount -= returnCnt;
S
Shengliang Guan 已提交
441
  taosThreadMutexUnlock(&pCache->mutex);
442 443
  return returnCnt;
}