syncRaftEntry.c 13.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftEntry.h"
M
Minghao Li 已提交
18
#include "syncUtil.h"
S
Shengliang Guan 已提交
19
#include "tref.h"
M
Minghao Li 已提交
20

21 22
SSyncRaftEntry* syncEntryBuild(int32_t dataLen) {
  int32_t         bytes = sizeof(SSyncRaftEntry) + dataLen;
23
  SSyncRaftEntry* pEntry = taosMemoryCalloc(1, bytes);
24 25 26 27 28
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

M
Minghao Li 已提交
29
  pEntry->bytes = bytes;
M
Minghao Li 已提交
30
  pEntry->dataLen = dataLen;
31
  pEntry->rid = -1;
M
Minghao Li 已提交
32 33 34 35

  return pEntry;
}

36
SSyncRaftEntry* syncEntryBuildFromClientRequest(const SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
M
Minghao Li 已提交
37
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
38
  if (pEntry == NULL) return NULL;
M
Minghao Li 已提交
39

M
Minghao Li 已提交
40 41 42 43 44 45 46 47 48 49 50
  pEntry->msgType = pMsg->msgType;
  pEntry->originalRpcType = pMsg->originalRpcType;
  pEntry->seqNum = pMsg->seqNum;
  pEntry->isWeak = pMsg->isWeak;
  pEntry->term = term;
  pEntry->index = index;
  memcpy(pEntry->data, pMsg->data, pMsg->dataLen);

  return pEntry;
}

51 52 53
SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, SyncIndex index) {
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->contLen);
  if (pEntry == NULL) return NULL;
54 55

  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
56
  pEntry->originalRpcType = pMsg->msgType;
57 58 59 60
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
  pEntry->term = term;
  pEntry->index = index;
61 62 63 64 65 66
  memcpy(pEntry->data, pMsg->pCont, pMsg->contLen);

  return pEntry;
}

SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
67
  SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen));
68
  if (pEntry == NULL) return NULL;
69

70
  memcpy(pEntry, pMsg->data, pMsg->dataLen);
71 72 73
  return pEntry;
}

M
Minghao Li 已提交
74
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
75 76
  SSyncRaftEntry* pEntry = syncEntryBuild(sizeof(SMsgHead));
  if (pEntry == NULL) return NULL;
M
Minghao Li 已提交
77

78 79
  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pEntry->originalRpcType = TDMT_SYNC_NOOP;
M
Minghao Li 已提交
80 81
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
M
Minghao Li 已提交
82 83
  pEntry->term = term;
  pEntry->index = index;
M
Minghao Li 已提交
84

85 86 87
  SMsgHead* pHead = (SMsgHead*)pEntry->data;
  pHead->vgId = vgId;
  pHead->contLen = sizeof(SMsgHead);
M
Minghao Li 已提交
88 89 90 91

  return pEntry;
}

B
Benguang Zhao 已提交
92
void syncEntryDestroy(SSyncRaftEntry* pEntry) {
M
Minghao Li 已提交
93
  if (pEntry != NULL) {
94
    sTrace("free entry: %p", pEntry);
95
    taosMemoryFree(pEntry);
M
Minghao Li 已提交
96
  }
M
Minghao Li 已提交
97 98
}

M
Minghao Li 已提交
99 100
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
  pRpcMsg->msgType = pEntry->originalRpcType;
101
  pRpcMsg->contLen = (int32_t)(pEntry->dataLen);
M
Minghao Li 已提交
102 103 104 105
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
}

106 107
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
  SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
M
Minghao Li 已提交
108
  if (pCache == NULL) {
S
Shengliang Guan 已提交
109
    sError("vgId:%d, raft cache create error", pSyncNode->vgId);
M
Minghao Li 已提交
110 111 112 113 114 115
    return NULL;
  }

  pCache->pEntryHash =
      taosHashInit(sizeof(SyncIndex), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  if (pCache->pEntryHash == NULL) {
S
Shengliang Guan 已提交
116
    sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
M
Minghao Li 已提交
117 118 119 120 121 122 123 124 125 126 127
    return NULL;
  }

  taosThreadMutexInit(&(pCache->mutex), NULL);
  pCache->maxCount = maxCount;
  pCache->currentCount = 0;
  pCache->pSyncNode = pSyncNode;

  return pCache;
}

128
void raftCacheDestroy(SRaftEntryHashCache* pCache) {
M
Minghao Li 已提交
129
  if (pCache != NULL) {
S
Shengliang Guan 已提交
130
    taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
131
    taosHashCleanup(pCache->pEntryHash);
S
Shengliang Guan 已提交
132
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
133 134 135 136 137 138 139 140
    taosThreadMutexDestroy(&(pCache->mutex));
    taosMemoryFree(pCache);
  }
}

// success, return 1
// max count, return 0
// error, return -1
141
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry) {
S
Shengliang Guan 已提交
142
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
143 144

  if (pCache->currentCount >= pCache->maxCount) {
S
Shengliang Guan 已提交
145
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
146 147 148 149 150 151
    return 0;
  }

  taosHashPut(pCache->pEntryHash, &(pEntry->index), sizeof(pEntry->index), pEntry, pEntry->bytes);
  ++(pCache->currentCount);

S
Shengliang Guan 已提交
152 153 154 155
  sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
          TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
          pEntry->index, pEntry->bytes);
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
156 157 158 159 160 161
  return 1;
}

// success, return 0
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
162
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
163 164 165 166 167
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
168
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
169 170 171 172 173 174
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = taosMemoryMalloc(pEntry->bytes);
    memcpy(*ppEntry, pTmp, pEntry->bytes);

S
Shengliang Guan 已提交
175 176 177 178
    sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
179 180 181
    return 0;
  }

S
Shengliang Guan 已提交
182
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
183 184 185 186 187 188 189
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

// success, return 0
// error, return -1
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
190
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
191 192 193 194 195
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
196
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
197 198 199 200 201
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = pEntry;

S
Shengliang Guan 已提交
202 203 204 205
    sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
206 207 208
    return 0;
  }

S
Shengliang Guan 已提交
209
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
210 211 212 213
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

214
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index) {
S
Shengliang Guan 已提交
215
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
216
  taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
M
Minghao Li 已提交
217
  --(pCache->currentCount);
S
Shengliang Guan 已提交
218
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
219 220 221
  return 0;
}

222
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
M
Minghao Li 已提交
223 224 225 226 227
  if (ppEntry == NULL) {
    return -1;
  }
  *ppEntry = NULL;

S
Shengliang Guan 已提交
228
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
229 230 231 232 233 234
  void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
  if (pTmp != NULL) {
    SSyncRaftEntry* pEntry = pTmp;
    *ppEntry = taosMemoryMalloc(pEntry->bytes);
    memcpy(*ppEntry, pTmp, pEntry->bytes);

S
Shengliang Guan 已提交
235 236 237
    sNTrace(pCache->pSyncNode, "raft cache get-and-del, type:%s,%d, type2:%s,%d, index:%" PRId64,
            TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
            (*ppEntry)->originalRpcType, (*ppEntry)->index);
M
Minghao Li 已提交
238 239

    taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
M
Minghao Li 已提交
240 241
    --(pCache->currentCount);

S
Shengliang Guan 已提交
242
    taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
243 244 245
    return 0;
  }

S
Shengliang Guan 已提交
246
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
247 248 249 250
  terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  return -1;
}

251
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) {
S
Shengliang Guan 已提交
252
  taosThreadMutexLock(&pCache->mutex);
M
Minghao Li 已提交
253
  taosHashClear(pCache->pEntryHash);
M
Minghao Li 已提交
254
  pCache->currentCount = 0;
S
Shengliang Guan 已提交
255
  taosThreadMutexUnlock(&pCache->mutex);
M
Minghao Li 已提交
256 257 258
  return 0;
}

259 260 261 262 263 264 265
static char* keyFn(const void* pData) {
  SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
  return (char*)(&(pEntry->index));
}

static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }

266 267
static void freeRaftEntry(void* param) {
  SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
B
Benguang Zhao 已提交
268
  syncEntryDestroy(pEntry);
269 270
}

271 272 273
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
  SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
  if (pCache == NULL) {
S
Shengliang Guan 已提交
274
    sError("vgId:%d, raft cache create error", pSyncNode->vgId);
275 276 277 278 279 280
    return NULL;
  }

  pCache->pSkipList =
      tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
  if (pCache->pSkipList == NULL) {
S
Shengliang Guan 已提交
281
    sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
282 283 284 285
    return NULL;
  }

  taosThreadMutexInit(&(pCache->mutex), NULL);
286
  pCache->refMgr = taosOpenRef(10, freeRaftEntry);
287 288 289 290 291 292 293 294 295
  pCache->maxCount = maxCount;
  pCache->currentCount = 0;
  pCache->pSyncNode = pSyncNode;

  return pCache;
}

void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
  if (pCache != NULL) {
S
Shengliang Guan 已提交
296
    taosThreadMutexLock(&pCache->mutex);
297
    tSkipListDestroy(pCache->pSkipList);
298 299 300 301
    if (pCache->refMgr != -1) {
      taosCloseRef(pCache->refMgr);
      pCache->refMgr = -1;
    }
S
Shengliang Guan 已提交
302
    taosThreadMutexUnlock(&pCache->mutex);
303 304 305 306 307 308 309 310 311
    taosThreadMutexDestroy(&(pCache->mutex));
    taosMemoryFree(pCache);
  }
}

// success, return 1
// max count, return 0
// error, return -1
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
S
Shengliang Guan 已提交
312
  taosThreadMutexLock(&pCache->mutex);
313 314

  if (pCache->currentCount >= pCache->maxCount) {
S
Shengliang Guan 已提交
315
    taosThreadMutexUnlock(&pCache->mutex);
316 317 318 319 320 321 322
    return 0;
  }

  SSkipListNode* pSkipListNode = tSkipListPut(pCache->pSkipList, pEntry);
  ASSERT(pSkipListNode != NULL);
  ++(pCache->currentCount);

323 324 325
  pEntry->rid = taosAddRef(pCache->refMgr, pEntry);
  ASSERT(pEntry->rid >= 0);

S
Shengliang Guan 已提交
326 327 328 329
  sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
          TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
          pEntry->index, pEntry->bytes);
  taosThreadMutexUnlock(&pCache->mutex);
330 331 332 333 334 335 336 337 338 339 340
  return 1;
}

// find one, return 1
// not found, return 0
// error, return -1
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
  ASSERT(ppEntry != NULL);
  SSyncRaftEntry* pEntry = NULL;
  int32_t         code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
  if (code == 1) {
341 342
    int32_t bytes = (int32_t)pEntry->bytes;
    *ppEntry = taosMemoryMalloc((int64_t)bytes);
343
    memcpy(*ppEntry, pEntry, pEntry->bytes);
344
    (*ppEntry)->rid = -1;
345 346 347 348 349 350 351 352 353 354
  } else {
    *ppEntry = NULL;
  }
  return code;
}

// find one, return 1
// not found, return 0
// error, return -1
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
S
Shengliang Guan 已提交
355
  taosThreadMutexLock(&pCache->mutex);
356 357 358 359 360 361 362 363 364 365

  SyncIndex index2 = index;
  int32_t   code = 0;

  SArray* entryPArray = tSkipListGet(pCache->pSkipList, (char*)(&index2));
  int32_t arraySize = taosArrayGetSize(entryPArray);
  if (arraySize == 1) {
    SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
    ASSERT(*ppNode != NULL);
    *ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
366
    taosAcquireRef(pCache->refMgr, (*ppEntry)->rid);
367 368 369 370 371 372 373 374 375 376 377 378
    code = 1;

  } else if (arraySize == 0) {
    code = 0;

  } else {
    ASSERT(0);

    code = -1;
  }
  taosArrayDestroy(entryPArray);

S
Shengliang Guan 已提交
379
  taosThreadMutexUnlock(&pCache->mutex);
380 381 382 383 384 385 386 387
  return code;
}

// count = -1, clear all
// count >= 0, clear count
// return -1, error
// return delete count
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
S
Shengliang Guan 已提交
388
  taosThreadMutexLock(&pCache->mutex);
389 390 391 392 393 394 395 396 397
  int32_t returnCnt = 0;

  if (count == -1) {
    // clear all
    SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
    while (tSkipListIterNext(pIter)) {
      SSkipListNode* pNode = tSkipListIterGet(pIter);
      ASSERT(pNode != NULL);
      SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
B
Benguang Zhao 已提交
398
      syncEntryDestroy(pEntry);
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
      ++returnCnt;
    }
    tSkipListDestroyIter(pIter);

    tSkipListDestroy(pCache->pSkipList);
    pCache->pSkipList =
        tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
    ASSERT(pCache->pSkipList != NULL);

  } else {
    // clear count
    int                i = 0;
    SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
    SArray*            delNodeArray = taosArrayInit(0, sizeof(SSkipListNode*));

    // free entry
    while (tSkipListIterNext(pIter)) {
      SSkipListNode* pNode = tSkipListIterGet(pIter);
      ASSERT(pNode != NULL);
      if (i++ >= count) {
        break;
      }

      // sDebug("push pNode:%p", pNode);
      taosArrayPush(delNodeArray, &pNode);
      ++returnCnt;
      SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
426

B
Benguang Zhao 已提交
427
      // syncEntryDestroy(pEntry);
428
      taosRemoveRef(pCache->refMgr, pEntry->rid);
429 430 431 432 433 434 435 436 437 438 439 440 441 442
    }
    tSkipListDestroyIter(pIter);

    // delete skiplist node
    int32_t arraySize = taosArrayGetSize(delNodeArray);
    for (int32_t i = 0; i < arraySize; ++i) {
      SSkipListNode** ppNode = taosArrayGet(delNodeArray, i);
      // sDebug("get pNode:%p", *ppNode);
      tSkipListRemoveNode(pCache->pSkipList, *ppNode);
    }
    taosArrayDestroy(delNodeArray);
  }

  pCache->currentCount -= returnCnt;
S
Shengliang Guan 已提交
443
  taosThreadMutexUnlock(&pCache->mutex);
444 445
  return returnCnt;
}