syncRaftEntry.c 3.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncRaftEntry.h"
M
Minghao Li 已提交
18
#include "syncUtil.h"
S
Shengliang Guan 已提交
19
#include "tref.h"
M
Minghao Li 已提交
20

21 22
SSyncRaftEntry* syncEntryBuild(int32_t dataLen) {
  int32_t         bytes = sizeof(SSyncRaftEntry) + dataLen;
23
  SSyncRaftEntry* pEntry = taosMemoryCalloc(1, bytes);
24 25 26 27 28
  if (pEntry == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

M
Minghao Li 已提交
29
  pEntry->bytes = bytes;
M
Minghao Li 已提交
30
  pEntry->dataLen = dataLen;
31
  pEntry->rid = -1;
M
Minghao Li 已提交
32 33 34 35

  return pEntry;
}

36
SSyncRaftEntry* syncEntryBuildFromClientRequest(const SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
M
Minghao Li 已提交
37
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
38
  if (pEntry == NULL) return NULL;
M
Minghao Li 已提交
39

M
Minghao Li 已提交
40 41 42 43 44 45 46 47 48 49 50
  pEntry->msgType = pMsg->msgType;
  pEntry->originalRpcType = pMsg->originalRpcType;
  pEntry->seqNum = pMsg->seqNum;
  pEntry->isWeak = pMsg->isWeak;
  pEntry->term = term;
  pEntry->index = index;
  memcpy(pEntry->data, pMsg->data, pMsg->dataLen);

  return pEntry;
}

51 52 53
SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, SyncIndex index) {
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->contLen);
  if (pEntry == NULL) return NULL;
54 55

  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
56
  pEntry->originalRpcType = pMsg->msgType;
57 58 59 60
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
  pEntry->term = term;
  pEntry->index = index;
61 62 63 64 65 66
  memcpy(pEntry->data, pMsg->pCont, pMsg->contLen);

  return pEntry;
}

SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg) {
67
  SSyncRaftEntry* pEntry = syncEntryBuild((int32_t)(pMsg->dataLen));
68
  if (pEntry == NULL) return NULL;
69

70
  memcpy(pEntry, pMsg->data, pMsg->dataLen);
71 72 73
  return pEntry;
}

M
Minghao Li 已提交
74
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
75 76
  SSyncRaftEntry* pEntry = syncEntryBuild(sizeof(SMsgHead));
  if (pEntry == NULL) return NULL;
M
Minghao Li 已提交
77

78 79
  pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pEntry->originalRpcType = TDMT_SYNC_NOOP;
M
Minghao Li 已提交
80 81
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
M
Minghao Li 已提交
82 83
  pEntry->term = term;
  pEntry->index = index;
M
Minghao Li 已提交
84

85 86 87
  SMsgHead* pHead = (SMsgHead*)pEntry->data;
  pHead->vgId = vgId;
  pHead->contLen = sizeof(SMsgHead);
M
Minghao Li 已提交
88 89 90 91

  return pEntry;
}

B
Benguang Zhao 已提交
92
void syncEntryDestroy(SSyncRaftEntry* pEntry) {
M
Minghao Li 已提交
93
  if (pEntry != NULL) {
S
Shengliang Guan 已提交
94
    sTrace("free entry:%p", pEntry);
95
    taosMemoryFree(pEntry);
M
Minghao Li 已提交
96
  }
M
Minghao Li 已提交
97 98
}

M
Minghao Li 已提交
99 100
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
  pRpcMsg->msgType = pEntry->originalRpcType;
101
  pRpcMsg->contLen = (int32_t)(pEntry->dataLen);
M
Minghao Li 已提交
102 103 104
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
}