syncMessage.c 9.2 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncMessage.h"
18
#include "syncRaftEntry.h"
M
Minghao Li 已提交
19

S
Shengliang Guan 已提交
20
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
S
Shengliang Guan 已提交
21 22
                         SSyncNode* pNode) {
  int32_t bytes = sizeof(SyncTimeout);
S
Shengliang Guan 已提交
23 24 25 26
  pMsg->pCont = rpcMallocCont(bytes);
  pMsg->msgType = TDMT_SYNC_TIMEOUT;
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
S
Shengliang Guan 已提交
27 28
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
29
  }
M
Minghao Li 已提交
30

S
Shengliang Guan 已提交
31
  SyncTimeout* pTimeout = pMsg->pCont;
S
Shengliang Guan 已提交
32 33 34 35 36 37 38 39
  pTimeout->bytes = bytes;
  pTimeout->msgType = TDMT_SYNC_TIMEOUT;
  pTimeout->vgId = pNode->vgId;
  pTimeout->timeoutType = timeoutType;
  pTimeout->logicClock = logicClock;
  pTimeout->timerMS = timerMS;
  pTimeout->data = pNode;
  return 0;
M
Minghao Li 已提交
40 41
}

S
Shengliang Guan 已提交
42 43 44 45 46 47
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
  int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
  pMsg->pCont = rpcMallocCont(bytes);
  pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
48 49
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
50 51
  }

S
Shengliang Guan 已提交
52
  SyncClientRequest* pClientRequest = pMsg->pCont;
53 54 55
  pClientRequest->bytes = bytes;
  pClientRequest->vgId = vgId;
  pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
S
Shengliang Guan 已提交
56
  pClientRequest->originalRpcType = pOriginal->msgType;
57 58
  pClientRequest->seqNum = seqNum;
  pClientRequest->isWeak = isWeak;
S
Shengliang Guan 已提交
59 60
  pClientRequest->dataLen = pOriginal->contLen;
  memcpy(pClientRequest->data, (char*)pOriginal->pCont, pOriginal->contLen);
M
Minghao Li 已提交
61

62
  return 0;
M
Minghao Li 已提交
63 64
}

S
Shengliang Guan 已提交
65
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
66
  int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
S
Shengliang Guan 已提交
67 68 69 70
  pMsg->pCont = rpcMallocCont(bytes);
  pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
71 72
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
73 74
  }

S
Shengliang Guan 已提交
75
  SyncClientRequest* pClientRequest = pMsg->pCont;
76 77 78 79 80 81
  pClientRequest->bytes = bytes;
  pClientRequest->vgId = vgId;
  pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pClientRequest->originalRpcType = TDMT_SYNC_NOOP;
  pClientRequest->dataLen = pEntry->bytes;
  memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes);
M
Minghao Li 已提交
82

83
  return 0;
M
Minghao Li 已提交
84 85
}

S
Shengliang Guan 已提交
86 87 88
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncRequestVote);
  pMsg->pCont = rpcMallocCont(bytes);
89
  pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
S
Shengliang Guan 已提交
90 91
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
92
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
93
    return -1;
M
Minghao Li 已提交
94
  }
M
Minghao Li 已提交
95

S
Shengliang Guan 已提交
96 97 98 99 100
  SyncRequestVote* pRequestVote = pMsg->pCont;
  pRequestVote->bytes = bytes;
  pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE;
  pRequestVote->vgId = vgId;
  return 0;
M
Minghao Li 已提交
101 102
}

S
Shengliang Guan 已提交
103 104 105
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncRequestVoteReply);
  pMsg->pCont = rpcMallocCont(bytes);
106
  pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
S
Shengliang Guan 已提交
107 108
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
109
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
110
    return -1;
M
Minghao Li 已提交
111
  }
M
Minghao Li 已提交
112

S
Shengliang Guan 已提交
113 114 115 116 117
  SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
  pRequestVoteReply->bytes = bytes;
  pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
  pRequestVoteReply->vgId = vgId;
  return 0;
M
Minghao Li 已提交
118 119
}

120 121 122
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
  int32_t bytes = sizeof(SyncAppendEntries) + dataLen;
  pMsg->pCont = rpcMallocCont(bytes);
123
  pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
124 125 126 127
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
128
  }
M
Minghao Li 已提交
129

130 131 132 133 134 135
  SyncAppendEntries* pAppendEntries = pMsg->pCont;
  pAppendEntries->bytes = bytes;
  pAppendEntries->vgId = vgId;
  pAppendEntries->msgType = TDMT_SYNC_APPEND_ENTRIES;
  pAppendEntries->dataLen = dataLen;
  return 0;
M
Minghao Li 已提交
136 137
}

138
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
139
  int32_t bytes = sizeof(SyncAppendEntriesReply);
140
  pMsg->pCont = rpcMallocCont(bytes);
141
  pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
142 143 144 145
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
146 147
  }

148 149 150 151 152
  SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
  pAppendEntriesReply->bytes = bytes;
  pAppendEntriesReply->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
  pAppendEntriesReply->vgId = vgId;
  return 0;
M
Minghao Li 已提交
153
}
M
Minghao Li 已提交
154

S
Shengliang Guan 已提交
155 156 157
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncHeartbeat);
  pMsg->pCont = rpcMallocCont(bytes);
158
  pMsg->msgType = TDMT_SYNC_HEARTBEAT;
S
Shengliang Guan 已提交
159 160 161 162
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
163 164
  }

S
Shengliang Guan 已提交
165 166 167 168 169
  SyncHeartbeat* pHeartbeat = pMsg->pCont;
  pHeartbeat->bytes = bytes;
  pHeartbeat->msgType = TDMT_SYNC_HEARTBEAT;
  pHeartbeat->vgId = vgId;
  return 0;
M
Minghao Li 已提交
170 171
}

172 173 174
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncHeartbeatReply);
  pMsg->pCont = rpcMallocCont(bytes);
175
  pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
176 177 178 179
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
180 181
  }

182 183 184 185 186
  SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
  pHeartbeatReply->bytes = bytes;
  pHeartbeatReply->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
  pHeartbeatReply->vgId = vgId;
  return 0;
187 188
}

S
Shengliang Guan 已提交
189
#if 0
S
Shengliang Guan 已提交
190 191 192
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncPreSnapshot);
  pMsg->pCont = rpcMallocCont(bytes);
M
Minghao Li 已提交
193
  pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT;
S
Shengliang Guan 已提交
194 195 196 197
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
198 199
  }

S
Shengliang Guan 已提交
200 201 202 203 204
  SyncPreSnapshot* pPreSnapshot = pMsg->pCont;
  pPreSnapshot->bytes = bytes;
  pPreSnapshot->msgType = TDMT_SYNC_PRE_SNAPSHOT;
  pPreSnapshot->vgId = vgId;
  return 0;
M
Minghao Li 已提交
205 206
}

S
Shengliang Guan 已提交
207 208 209
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncPreSnapshotReply);
  pMsg->pCont = rpcMallocCont(bytes);
M
Minghao Li 已提交
210
  pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
S
Shengliang Guan 已提交
211 212 213 214
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
215 216
  }

S
Shengliang Guan 已提交
217 218 219 220 221
  SyncPreSnapshotReply* pPreSnapshotReply = pMsg->pCont;
  pPreSnapshotReply->bytes = bytes;
  pPreSnapshotReply->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
  pPreSnapshotReply->vgId = vgId;
  return 0;
M
Minghao Li 已提交
222
}
S
Shengliang Guan 已提交
223
#endif
M
Minghao Li 已提交
224

225 226 227
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
  int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
  pMsg->pCont = rpcMallocCont(bytes);
228
  pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
229 230 231 232
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
233 234
  }

235 236 237 238 239 240
  SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
  pSnapshotSend->bytes = bytes;
  pSnapshotSend->vgId = vgId;
  pSnapshotSend->msgType = TDMT_SYNC_SNAPSHOT_SEND;
  pSnapshotSend->dataLen = dataLen;
  return 0;
241 242
}

243 244 245
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncSnapshotRsp);
  pMsg->pCont = rpcMallocCont(bytes);
246
  pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
247 248 249 250
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
251 252
  }

253 254 255 256 257
  SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
  pPreSnapshotRsp->bytes = bytes;
  pPreSnapshotRsp->msgType = TDMT_SYNC_SNAPSHOT_RSP;
  pPreSnapshotRsp->vgId = vgId;
  return 0;
258 259
}

260 261 262
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncLeaderTransfer);
  pMsg->pCont = rpcMallocCont(bytes);
263
  pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
264 265 266 267
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
268 269
  }

270 271 272 273 274
  SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
  pLeaderTransfer->bytes = bytes;
  pLeaderTransfer->msgType = TDMT_SYNC_LEADER_TRANSFER;
  pLeaderTransfer->vgId = vgId;
  return 0;
275 276
}

S
Shengliang Guan 已提交
277 278 279
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncLocalCmd);
  pMsg->pCont = rpcMallocCont(bytes);
M
Minghao Li 已提交
280
  pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
S
Shengliang Guan 已提交
281 282 283 284
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
285 286
  }

S
Shengliang Guan 已提交
287 288 289 290 291
  SyncLocalCmd* pLocalCmd = pMsg->pCont;
  pLocalCmd->bytes = bytes;
  pLocalCmd->msgType = TDMT_SYNC_LOCAL_CMD;
  pLocalCmd->vgId = vgId;
  return 0;
M
Minghao Li 已提交
292 293
}

S
Shengliang Guan 已提交
294 295 296 297 298 299 300 301 302 303
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  switch (timerType) {
    case SYNC_TIMEOUT_PING:
      return "ping";
    case SYNC_TIMEOUT_ELECTION:
      return "elect";
    case SYNC_TIMEOUT_HEARTBEAT:
      return "heartbeat";
    default:
      return "unknown";
M
Minghao Li 已提交
304 305 306
  }
}

S
Shengliang Guan 已提交
307 308 309 310 311 312 313 314
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd) {
  switch (cmd) {
    case SYNC_LOCAL_CMD_STEP_DOWN:
      return "step-down";
    case SYNC_LOCAL_CMD_FOLLOWER_CMT:
      return "follower-commit";
    default:
      return "unknown-local-cmd";
M
Minghao Li 已提交
315
  }
S
Shengliang Guan 已提交
316
}