syncRaftEntry.c 5.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

M
Minghao Li 已提交
16
#include "syncRaftEntry.h"
M
Minghao Li 已提交
17
#include "syncUtil.h"
M
Minghao Li 已提交
18

M
Minghao Li 已提交
19 20
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
  uint32_t        bytes = sizeof(SSyncRaftEntry) + dataLen;
wafwerar's avatar
wafwerar 已提交
21
  SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes);
M
Minghao Li 已提交
22 23 24
  assert(pEntry != NULL);
  memset(pEntry, 0, bytes);
  pEntry->bytes = bytes;
M
Minghao Li 已提交
25 26 27 28
  pEntry->dataLen = dataLen;
  return pEntry;
}

M
Minghao Li 已提交
29
// step 4. SyncClientRequest => SSyncRaftEntry, add term, index
M
Minghao Li 已提交
30
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
M
Minghao Li 已提交
31
  SSyncRaftEntry* pEntry = syncEntryBuild3(pMsg, term, index);
M
Minghao Li 已提交
32 33 34 35 36
  assert(pEntry != NULL);

  return pEntry;
}

M
Minghao Li 已提交
37
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
M
Minghao Li 已提交
38 39 40
  SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
  assert(pEntry != NULL);

M
Minghao Li 已提交
41 42 43 44 45 46 47 48 49 50 51 52
  pEntry->msgType = pMsg->msgType;
  pEntry->originalRpcType = pMsg->originalRpcType;
  pEntry->seqNum = pMsg->seqNum;
  pEntry->isWeak = pMsg->isWeak;
  pEntry->term = term;
  pEntry->index = index;
  pEntry->dataLen = pMsg->dataLen;
  memcpy(pEntry->data, pMsg->data, pMsg->dataLen);

  return pEntry;
}

M
Minghao Li 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
  // init rpcMsg
  SMsgHead head;
  head.vgId = vgId;
  head.contLen = sizeof(SMsgHead);
  SRpcMsg rpcMsg;
  memset(&rpcMsg, 0, sizeof(SRpcMsg));
  rpcMsg.contLen = head.contLen;
  rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
  rpcMsg.msgType = TDMT_VND_SYNC_NOOP;
  memcpy(rpcMsg.pCont, &head, sizeof(head));

  SSyncRaftEntry* pEntry = syncEntryBuild(rpcMsg.contLen);
M
Minghao Li 已提交
66
  assert(pEntry != NULL);
M
Minghao Li 已提交
67 68 69 70 71

  pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
  pEntry->originalRpcType = TDMT_VND_SYNC_NOOP;
  pEntry->seqNum = 0;
  pEntry->isWeak = 0;
M
Minghao Li 已提交
72 73
  pEntry->term = term;
  pEntry->index = index;
M
Minghao Li 已提交
74 75 76 77

  assert(pEntry->dataLen == rpcMsg.contLen);
  memcpy(pEntry->data, rpcMsg.pCont, rpcMsg.contLen);
  rpcFreeCont(rpcMsg.pCont);
M
Minghao Li 已提交
78 79 80 81

  return pEntry;
}

M
Minghao Li 已提交
82 83
void syncEntryDestory(SSyncRaftEntry* pEntry) {
  if (pEntry != NULL) {
wafwerar's avatar
wafwerar 已提交
84
    taosMemoryFree(pEntry);
M
Minghao Li 已提交
85 86 87
  }
}

M
Minghao Li 已提交
88
// step 5. SSyncRaftEntry => bin, to raft log
M
Minghao Li 已提交
89
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
wafwerar's avatar
wafwerar 已提交
90
  char* buf = taosMemoryMalloc(pEntry->bytes);
M
Minghao Li 已提交
91
  assert(buf != NULL);
M
Minghao Li 已提交
92
  memcpy(buf, pEntry, pEntry->bytes);
M
Minghao Li 已提交
93 94 95 96
  if (len != NULL) {
    *len = pEntry->bytes;
  }
  return buf;
M
Minghao Li 已提交
97 98
}

M
Minghao Li 已提交
99
// step 6. bin => SSyncRaftEntry, from raft log
M
Minghao Li 已提交
100 101
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
  uint32_t        bytes = *((uint32_t*)buf);
wafwerar's avatar
wafwerar 已提交
102
  SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes);
M
Minghao Li 已提交
103
  assert(pEntry != NULL);
M
Minghao Li 已提交
104 105
  memcpy(pEntry, buf, len);
  assert(len == pEntry->bytes);
M
Minghao Li 已提交
106
  return pEntry;
M
Minghao Li 已提交
107 108 109
}

cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
M
Minghao Li 已提交
110
  char   u64buf[128];
M
Minghao Li 已提交
111 112
  cJSON* pRoot = cJSON_CreateObject();

M
Minghao Li 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
  if (pEntry != NULL) {
    cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes);
    cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType);
    cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType);
    snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->seqNum);
    cJSON_AddStringToObject(pRoot, "seqNum", u64buf);
    cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak);
    snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->term);
    cJSON_AddStringToObject(pRoot, "term", u64buf);
    snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index);
    cJSON_AddStringToObject(pRoot, "index", u64buf);
    cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen);

    char* s;
    s = syncUtilprintBin((char*)(pEntry->data), pEntry->dataLen);
    cJSON_AddStringToObject(pRoot, "data", s);
wafwerar's avatar
wafwerar 已提交
129
    taosMemoryFree(s);
M
Minghao Li 已提交
130

M
Minghao Li 已提交
131 132
    s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen);
    cJSON_AddStringToObject(pRoot, "data2", s);
wafwerar's avatar
wafwerar 已提交
133
    taosMemoryFree(s);
M
Minghao Li 已提交
134
  }
M
Minghao Li 已提交
135

M
Minghao Li 已提交
136 137 138 139 140 141 142 143 144 145
  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot);
  return pJson;
}

char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
  cJSON* pJson = syncEntry2Json(pEntry);
  char*  serialized = cJSON_Print(pJson);
  cJSON_Delete(pJson);
  return serialized;
M
Minghao Li 已提交
146 147
}

M
Minghao Li 已提交
148 149 150 151 152 153 154 155 156
// step 7. SSyncRaftEntry => original SRpcMsg, commit to user, delete seqNum, isWeak, term, index
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
  memset(pRpcMsg, 0, sizeof(*pRpcMsg));
  pRpcMsg->msgType = pEntry->originalRpcType;
  pRpcMsg->contLen = pEntry->dataLen;
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
}

M
Minghao Li 已提交
157
// for debug ----------------------
M
Minghao Li 已提交
158 159
void syncEntryPrint(const SSyncRaftEntry* pObj) {
  char* serialized = syncEntry2Str(pObj);
160
  printf("syncEntryPrint | len:%zu | %s \n", strlen(serialized), serialized);
M
Minghao Li 已提交
161
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
162
  taosMemoryFree(serialized);
M
Minghao Li 已提交
163 164
}

M
Minghao Li 已提交
165 166
void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj) {
  char* serialized = syncEntry2Str(pObj);
167
  printf("syncEntryPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized);
M
Minghao Li 已提交
168
  fflush(NULL);
wafwerar's avatar
wafwerar 已提交
169
  taosMemoryFree(serialized);
M
Minghao Li 已提交
170 171
}

M
Minghao Li 已提交
172 173
void syncEntryLog(const SSyncRaftEntry* pObj) {
  char* serialized = syncEntry2Str(pObj);
174
  sTrace("syncEntryLog | len:%zu | %s", strlen(serialized), serialized);
wafwerar's avatar
wafwerar 已提交
175
  taosMemoryFree(serialized);
M
Minghao Li 已提交
176 177
}

M
Minghao Li 已提交
178 179
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) {
  char* serialized = syncEntry2Str(pObj);
180
  sTrace("syncEntryLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
wafwerar's avatar
wafwerar 已提交
181
  taosMemoryFree(serialized);
182
}