syncRaftEntry.c 13.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftEntry.h"
M
Minghao Li 已提交
18
#include "syncUtil.h"
S
Shengliang Guan 已提交
19
#include "tref.h"
M
Minghao Li 已提交
20

21 22
SSyncRaftEntry* syncEntryBuild(int32_t dataLen) {
  int32_t         bytes = sizeof(SSyncRaftEntry) + dataLen;
wafwerar's avatar
wafwerar 已提交
23
  SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes);
24 25 26 27 28
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

M
Minghao Li 已提交
29
  pEntry->bytes = bytes;
M
Minghao Li 已提交
30
  pEntry->dataLen = dataLen;
31
  pEntry->rid = -1;
M
Minghao Li 已提交
32 33 34 35

  return pEntry;
}

36
SSyncRaftEntry* syncEntryBuildFromClientRequest(const SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
M
Minghao Li 已提交
37
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
38
  if (pEntry == NULL) return NULL;
M
Minghao Li 已提交
39

M
Minghao Li 已提交
40 41 42 43 44 45 46 47 48 49 50
  pEntry->msgType = pMsg->msgType;
  pEntry->originalRpcType = pMsg->originalRpcType;
  pEntry->seqNum = pMsg->seqNum;
  pEntry->isWeak = pMsg->isWeak;
  pEntry->term = term;
  pEntry->index = index;
  memcpy(pEntry->data, pMsg->data, pMsg->dataLen);

  return pEntry;
}

51 52 53
SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, SyncIndex index) {
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->contLen);
  if (pEntry == NULL) return NULL;
54 55

  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
56
  pEntry->originalRpcType = pMsg->msgType;
57 58 59 60
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
  pEntry->term = term;
  pEntry->index = index;
61 62 63 64 65 66
  memcpy(pEntry->data, pMsg->pCont, pMsg->contLen);

  return pEntry;
}

SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
67
  SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen));
68
  if (pEntry == NULL) return NULL;
69

70
  memcpy(pEntry, pMsg->data, pMsg->dataLen);
71 72 73
  return pEntry;
}

M
Minghao Li 已提交
74
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
75 76
  SSyncRaftEntry* pEntry = syncEntryBuild(sizeof(SMsgHead));
  if (pEntry == NULL) return NULL;
M
Minghao Li 已提交
77

78 79
  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pEntry->originalRpcType = TDMT_SYNC_NOOP;
M
Minghao Li 已提交
80 81
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
M
Minghao Li 已提交
82 83
  pEntry->term = term;
  pEntry->index = index;
M
Minghao Li 已提交
84

85 86 87
  SMsgHead* pHead = (SMsgHead*)pEntry->data;
  pHead->vgId = vgId;
  pHead->contLen = sizeof(SMsgHead);
M
Minghao Li 已提交
88 89 90 91

  return pEntry;
}

M
Minghao Li 已提交
92 93
void syncEntryDestory(SSyncRaftEntry* pEntry) {
  if (pEntry != NULL) {
94
    sTrace("free entry: %p", pEntry);
95
    taosMemoryFree(pEntry);
M
Minghao Li 已提交
96 97 98
  }
}

M
Minghao Li 已提交
99 100
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
  pRpcMsg->msgType = pEntry->originalRpcType;
101
  pRpcMsg->contLen = (int32_t)(pEntry->dataLen);
M
Minghao Li 已提交
102 103 104 105
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
}

106 107
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
  SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
M
Minghao Li 已提交
108
  if (pCache == NULL) {
S
Shengliang Guan 已提交
109
    sError("vgId:%d, raft cache create error", pSyncNode->vgId);
M
Minghao Li 已提交
110 111 112 113 114 115
    return NULL;
  }

  pCache->pEntryHash =
      taosHashInit(sizeof(SyncIndex), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  if (pCache->pEntryHash == NULL) {
S
Shengliang Guan 已提交
116
    sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
M
Minghao Li 已提交
117 118 119 120 121 122 123 124 125 126 127
    return NULL;
  }

  taosThreadMutexInit(&(pCache->mutex), NULL);
  pCache->maxCount = maxCount;
  pCache->currentCount = 0;
  pCache->pSyncNode = pSyncNode;

  return pCache;
}

128
void raftCacheDestroy(SRaftEntryHashCache* pCache) {
M
Minghao Li 已提交
129
  if (pCache != NULL) {
S
Shengliang Guan 已提交
130
    taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
131
    taosHashCleanup(pCache->pEntryHash);
S
Shengliang Guan 已提交
132
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
133 134 135 136 137 138 139 140
    taosThreadMutexDestroy(&(pCache->mutex));
    taosMemoryFree(pCache);
  }
}

// success, return 1
// max count, return 0
// error, return -1
141
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry) {
S
Shengliang Guan 已提交
142
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
143 144

  if (pCache->currentCount >= pCache->maxCount) {
S
Shengliang Guan 已提交
145
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
146 147 148 149 150 151
    return 0;
  }

  taosHashPut(pCache->pEntryHash, &(pEntry->index), sizeof(pEntry->index), pEntry, pEntry->bytes);
  ++(pCache->currentCount);

S
Shengliang Guan 已提交
152 153 154 155
  sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
          TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
          pEntry->index, pEntry->bytes);
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
156 157 158 159 160 161
  return 1;
}

// success, return 0
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
162
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
163 164 165 166 167
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
168
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
169 170 171 172 173 174
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = taosMemoryMalloc(pEntry->bytes);
    memcpy(*ppEntry, pTmp, pEntry->bytes);

S
Shengliang Guan 已提交
175 176 177 178
    sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
179 180 181
    return 0;
  }

S
Shengliang Guan 已提交
182
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
183 184 185 186 187 188 189
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

// success, return 0
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
190
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
191 192 193 194 195
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
196
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
197 198 199 200 201
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = pEntry;

S
Shengliang Guan 已提交
202 203 204 205
    sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
206 207 208
    return 0;
  }

S
Shengliang Guan 已提交
209
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
210 211 212 213
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

214
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index) {
S
Shengliang Guan 已提交
215
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
216
  taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
M
Minghao Li 已提交
217
  --(pCache->currentCount);
S
Shengliang Guan 已提交
218
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
219 220 221
  return 0;
}

222
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
223 224 225 226 227
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
228
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
229 230 231 232 233 234
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = taosMemoryMalloc(pEntry->bytes);
    memcpy(*ppEntry, pTmp, pEntry->bytes);

S
Shengliang Guan 已提交
235 236 237
    sNTrace(pCache->pSyncNode, "raft cache get-and-del, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
M
Minghao Li 已提交
238 239

    taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
M
Minghao Li 已提交
240 241
    --(pCache->currentCount);

S
Shengliang Guan 已提交
242
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
243 244 245
    return 0;
  }

S
Shengliang Guan 已提交
246
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
247 248 249 250
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

251
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) {
S
Shengliang Guan 已提交
252
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
253
  taosHashClear(pCache->pEntryHash);
M
Minghao Li 已提交
254
  pCache->currentCount = 0;
S
Shengliang Guan 已提交
255
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
256 257 258
  return 0;
}

259 260 261 262 263 264 265
static char* keyFn(const void* pData) {
  SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
  return (char*)(&(pEntry->index));
}

static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }

266 267 268 269 270
static void freeRaftEntry(void* param) {
  SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
  syncEntryDestory(pEntry);
}

271 272 273
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
  SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
  if (pCache == NULL) {
S
Shengliang Guan 已提交
274
    sError("vgId:%d, raft cache create error", pSyncNode->vgId);
275 276 277 278 279 280
    return NULL;
  }

  pCache->pSkipList =
      tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
  if (pCache->pSkipList == NULL) {
S
Shengliang Guan 已提交
281
    sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
282 283 284 285
    return NULL;
  }

  taosThreadMutexInit(&(pCache->mutex), NULL);
286
  pCache->refMgr = taosOpenRef(10, freeRaftEntry);
287 288 289 290 291 292 293 294 295
  pCache->maxCount = maxCount;
  pCache->currentCount = 0;
  pCache->pSyncNode = pSyncNode;

  return pCache;
}

void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
  if (pCache != NULL) {
S
Shengliang Guan 已提交
296
    taosThreadMutexLock(&pCache->mutex);
297
    tSkipListDestroy(pCache->pSkipList);
298 299 300 301
    if (pCache->refMgr != -1) {
      taosCloseRef(pCache->refMgr);
      pCache->refMgr = -1;
    }
S
Shengliang Guan 已提交
302
    taosThreadMutexUnlock(&pCache->mutex);
303 304 305 306 307 308 309 310 311
    taosThreadMutexDestroy(&(pCache->mutex));
    taosMemoryFree(pCache);
  }
}

// success, return 1
// max count, return 0
// error, return -1
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
S
Shengliang Guan 已提交
312
  taosThreadMutexLock(&pCache->mutex);
313 314

  if (pCache->currentCount >= pCache->maxCount) {
S
Shengliang Guan 已提交
315
    taosThreadMutexUnlock(&pCache->mutex);
316 317 318 319 320 321 322
    return 0;
  }

  SSkipListNode* pSkipListNode = tSkipListPut(pCache->pSkipList, pEntry);
  ASSERT(pSkipListNode != NULL);
  ++(pCache->currentCount);

323 324 325
  pEntry->rid = taosAddRef(pCache->refMgr, pEntry);
  ASSERT(pEntry->rid >= 0);

S
Shengliang Guan 已提交
326 327 328 329
  sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
          TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
          pEntry->index, pEntry->bytes);
  taosThreadMutexUnlock(&pCache->mutex);
330 331 332 333 334 335 336 337 338 339 340
  return 1;
}

// find one, return 1
// not found, return 0
// error, return -1
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
  ASSERT(ppEntry != NULL);
  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
  if (code == 1) {
341 342
    int32_t bytes = (int32_t)pEntry->bytes;
    *ppEntry = taosMemoryMalloc((int64_t)bytes);
343
    memcpy(*ppEntry, pEntry, pEntry->bytes);
344
    (*ppEntry)->rid = -1;
345 346 347 348 349 350 351 352 353 354
  } else {
    *ppEntry = NULL;
  }
  return code;
}

// find one, return 1
// not found, return 0
// error, return -1
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
S
Shengliang Guan 已提交
355
  taosThreadMutexLock(&pCache->mutex);
356 357 358 359 360 361 362 363 364 365

  SyncIndex index2 = index;
  int32_t   code = 0;

  SArray* entryPArray = tSkipListGet(pCache->pSkipList, (char*)(&index2));
  int32_t arraySize = taosArrayGetSize(entryPArray);
  if (arraySize == 1) {
    SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
    ASSERT(*ppNode != NULL);
    *ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
366
    taosAcquireRef(pCache->refMgr, (*ppEntry)->rid);
367 368 369 370 371 372 373 374 375 376 377 378
    code = 1;

  } else if (arraySize == 0) {
    code = 0;

  } else {
    ASSERT(0);

    code = -1;
  }
  taosArrayDestroy(entryPArray);

S
Shengliang Guan 已提交
379
  taosThreadMutexUnlock(&pCache->mutex);
380 381 382 383 384 385 386 387
  return code;
}

// count = -1, clear all
// count >= 0, clear count
// return -1, error
// return delete count
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
S
Shengliang Guan 已提交
388
  taosThreadMutexLock(&pCache->mutex);
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
  int32_t returnCnt = 0;

  if (count == -1) {
    // clear all
    SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
    while (tSkipListIterNext(pIter)) {
      SSkipListNode* pNode = tSkipListIterGet(pIter);
      ASSERT(pNode != NULL);
      SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
      syncEntryDestory(pEntry);
      ++returnCnt;
    }
    tSkipListDestroyIter(pIter);

    tSkipListDestroy(pCache->pSkipList);
    pCache->pSkipList =
        tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
    ASSERT(pCache->pSkipList != NULL);

  } else {
    // clear count
    int                i = 0;
    SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
    SArray*            delNodeArray = taosArrayInit(0, sizeof(SSkipListNode*));

    // free entry
    while (tSkipListIterNext(pIter)) {
      SSkipListNode* pNode = tSkipListIterGet(pIter);
      ASSERT(pNode != NULL);
      if (i++ >= count) {
        break;
      }

      // sDebug("push pNode:%p", pNode);
      taosArrayPush(delNodeArray, &pNode);
      ++returnCnt;
      SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
426 427 428

      // syncEntryDestory(pEntry);
      taosRemoveRef(pCache->refMgr, pEntry->rid);
429 430 431 432 433 434 435 436 437 438 439 440 441 442
    }
    tSkipListDestroyIter(pIter);

    // delete skiplist node
    int32_t arraySize = taosArrayGetSize(delNodeArray);
    for (int32_t i = 0; i < arraySize; ++i) {
      SSkipListNode** ppNode = taosArrayGet(delNodeArray, i);
      // sDebug("get pNode:%p", *ppNode);
      tSkipListRemoveNode(pCache->pSkipList, *ppNode);
    }
    taosArrayDestroy(delNodeArray);
  }

  pCache->currentCount -= returnCnt;
S
Shengliang Guan 已提交
443
  taosThreadMutexUnlock(&pCache->mutex);
444 445
  return returnCnt;
}