syncMessage.c 10.1 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
M
Minghao Li 已提交
17
#include "syncMessage.h"
18
#include "syncRaftEntry.h"
19
#include "syncRaftStore.h"
M
Minghao Li 已提交
20

S
Shengliang Guan 已提交
21
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
S
Shengliang Guan 已提交
22 23
                         SSyncNode* pNode) {
  int32_t bytes = sizeof(SyncTimeout);
S
Shengliang Guan 已提交
24 25 26 27
  pMsg->pCont = rpcMallocCont(bytes);
  pMsg->msgType = TDMT_SYNC_TIMEOUT;
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
S
Shengliang Guan 已提交
28 29
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
30
  }
M
Minghao Li 已提交
31

S
Shengliang Guan 已提交
32
  SyncTimeout* pTimeout = pMsg->pCont;
S
Shengliang Guan 已提交
33 34 35 36 37 38
  pTimeout->bytes = bytes;
  pTimeout->msgType = TDMT_SYNC_TIMEOUT;
  pTimeout->vgId = pNode->vgId;
  pTimeout->timeoutType = timeoutType;
  pTimeout->logicClock = logicClock;
  pTimeout->timerMS = timerMS;
M
Minghao Li 已提交
39
  pTimeout->timeStamp = taosGetTimestampMs();
S
Shengliang Guan 已提交
40 41
  pTimeout->data = pNode;
  return 0;
M
Minghao Li 已提交
42 43
}

S
Shengliang Guan 已提交
44 45 46 47 48 49
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
  int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
  pMsg->pCont = rpcMallocCont(bytes);
  pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
50 51
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
52 53
  }

S
Shengliang Guan 已提交
54
  SyncClientRequest* pClientRequest = pMsg->pCont;
55 56 57
  pClientRequest->bytes = bytes;
  pClientRequest->vgId = vgId;
  pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
S
Shengliang Guan 已提交
58
  pClientRequest->originalRpcType = pOriginal->msgType;
59 60
  pClientRequest->seqNum = seqNum;
  pClientRequest->isWeak = isWeak;
S
Shengliang Guan 已提交
61 62
  pClientRequest->dataLen = pOriginal->contLen;
  memcpy(pClientRequest->data, (char*)pOriginal->pCont, pOriginal->contLen);
M
Minghao Li 已提交
63

64
  return 0;
M
Minghao Li 已提交
65 66
}

S
Shengliang Guan 已提交
67
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
68
  int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
S
Shengliang Guan 已提交
69 70 71 72
  pMsg->pCont = rpcMallocCont(bytes);
  pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
73 74
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
75 76
  }

S
Shengliang Guan 已提交
77
  SyncClientRequest* pClientRequest = pMsg->pCont;
78 79 80 81 82 83
  pClientRequest->bytes = bytes;
  pClientRequest->vgId = vgId;
  pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
  pClientRequest->originalRpcType = TDMT_SYNC_NOOP;
  pClientRequest->dataLen = pEntry->bytes;
  memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes);
M
Minghao Li 已提交
84

85
  return 0;
M
Minghao Li 已提交
86 87
}

S
Shengliang Guan 已提交
88 89 90
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncRequestVote);
  pMsg->pCont = rpcMallocCont(bytes);
91
  pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
S
Shengliang Guan 已提交
92 93
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
94
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
95
    return -1;
M
Minghao Li 已提交
96
  }
M
Minghao Li 已提交
97

S
Shengliang Guan 已提交
98 99 100 101 102
  SyncRequestVote* pRequestVote = pMsg->pCont;
  pRequestVote->bytes = bytes;
  pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE;
  pRequestVote->vgId = vgId;
  return 0;
M
Minghao Li 已提交
103 104
}

S
Shengliang Guan 已提交
105 106 107
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncRequestVoteReply);
  pMsg->pCont = rpcMallocCont(bytes);
108
  pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
S
Shengliang Guan 已提交
109 110
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
111
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
112
    return -1;
M
Minghao Li 已提交
113
  }
M
Minghao Li 已提交
114

S
Shengliang Guan 已提交
115 116 117 118 119
  SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
  pRequestVoteReply->bytes = bytes;
  pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
  pRequestVoteReply->vgId = vgId;
  return 0;
M
Minghao Li 已提交
120 121
}

122 123 124
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
  int32_t bytes = sizeof(SyncAppendEntries) + dataLen;
  pMsg->pCont = rpcMallocCont(bytes);
125
  pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
126 127 128 129
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
130
  }
M
Minghao Li 已提交
131

132 133 134 135 136 137
  SyncAppendEntries* pAppendEntries = pMsg->pCont;
  pAppendEntries->bytes = bytes;
  pAppendEntries->vgId = vgId;
  pAppendEntries->msgType = TDMT_SYNC_APPEND_ENTRIES;
  pAppendEntries->dataLen = dataLen;
  return 0;
M
Minghao Li 已提交
138 139
}

140
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
141
  int32_t bytes = sizeof(SyncAppendEntriesReply);
142
  pMsg->pCont = rpcMallocCont(bytes);
143
  pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
144 145 146 147
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
148 149
  }

150 151 152 153 154
  SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
  pAppendEntriesReply->bytes = bytes;
  pAppendEntriesReply->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
  pAppendEntriesReply->vgId = vgId;
  return 0;
M
Minghao Li 已提交
155
}
M
Minghao Li 已提交
156

157 158
int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
                                            SRpcMsg* pRpcMsg) {
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
  uint32_t dataLen = pEntry->bytes;
  uint32_t bytes = sizeof(SyncAppendEntries) + dataLen;
  pRpcMsg->contLen = bytes;
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  if (pRpcMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  SyncAppendEntries* pMsg = pRpcMsg->pCont;
  pMsg->bytes = pRpcMsg->contLen;
  pMsg->msgType = pRpcMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
  pMsg->dataLen = dataLen;

  (void)memcpy(pMsg->data, pEntry, dataLen);

  pMsg->prevLogIndex = pEntry->index - 1;
  pMsg->prevLogTerm = prevLogTerm;
  pMsg->vgId = pNode->vgId;
  pMsg->srcId = pNode->myRaftId;
179
  pMsg->term = raftStoreGetTerm(pNode);
180 181 182 183 184
  pMsg->commitIndex = pNode->commitIndex;
  pMsg->privateTerm = 0;
  return 0;
}

S
Shengliang Guan 已提交
185 186 187
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncHeartbeat);
  pMsg->pCont = rpcMallocCont(bytes);
188
  pMsg->msgType = TDMT_SYNC_HEARTBEAT;
S
Shengliang Guan 已提交
189 190 191 192
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
193 194
  }

S
Shengliang Guan 已提交
195 196 197 198 199
  SyncHeartbeat* pHeartbeat = pMsg->pCont;
  pHeartbeat->bytes = bytes;
  pHeartbeat->msgType = TDMT_SYNC_HEARTBEAT;
  pHeartbeat->vgId = vgId;
  return 0;
M
Minghao Li 已提交
200 201
}

202 203 204
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncHeartbeatReply);
  pMsg->pCont = rpcMallocCont(bytes);
205
  pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
206 207 208 209
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
210 211
  }

212 213 214 215 216
  SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
  pHeartbeatReply->bytes = bytes;
  pHeartbeatReply->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
  pHeartbeatReply->vgId = vgId;
  return 0;
217 218
}

S
Shengliang Guan 已提交
219
#if 0
S
Shengliang Guan 已提交
220 221 222
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncPreSnapshot);
  pMsg->pCont = rpcMallocCont(bytes);
M
Minghao Li 已提交
223
  pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT;
S
Shengliang Guan 已提交
224 225 226 227
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
228 229
  }

S
Shengliang Guan 已提交
230 231 232 233 234
  SyncPreSnapshot* pPreSnapshot = pMsg->pCont;
  pPreSnapshot->bytes = bytes;
  pPreSnapshot->msgType = TDMT_SYNC_PRE_SNAPSHOT;
  pPreSnapshot->vgId = vgId;
  return 0;
M
Minghao Li 已提交
235 236
}

S
Shengliang Guan 已提交
237 238 239
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncPreSnapshotReply);
  pMsg->pCont = rpcMallocCont(bytes);
M
Minghao Li 已提交
240
  pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
S
Shengliang Guan 已提交
241 242 243 244
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
245 246
  }

S
Shengliang Guan 已提交
247 248 249 250 251
  SyncPreSnapshotReply* pPreSnapshotReply = pMsg->pCont;
  pPreSnapshotReply->bytes = bytes;
  pPreSnapshotReply->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
  pPreSnapshotReply->vgId = vgId;
  return 0;
M
Minghao Li 已提交
252
}
S
Shengliang Guan 已提交
253
#endif
M
Minghao Li 已提交
254

255 256 257
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
  int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
  pMsg->pCont = rpcMallocCont(bytes);
258
  pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
259 260 261 262
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
263 264
  }

265 266 267 268 269 270
  SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
  pSnapshotSend->bytes = bytes;
  pSnapshotSend->vgId = vgId;
  pSnapshotSend->msgType = TDMT_SYNC_SNAPSHOT_SEND;
  pSnapshotSend->dataLen = dataLen;
  return 0;
271 272
}

273 274 275
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncSnapshotRsp);
  pMsg->pCont = rpcMallocCont(bytes);
276
  pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
277 278 279 280
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
281 282
  }

283 284 285 286 287
  SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
  pPreSnapshotRsp->bytes = bytes;
  pPreSnapshotRsp->msgType = TDMT_SYNC_SNAPSHOT_RSP;
  pPreSnapshotRsp->vgId = vgId;
  return 0;
288 289
}

290 291 292
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncLeaderTransfer);
  pMsg->pCont = rpcMallocCont(bytes);
293
  pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
294 295 296 297
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
298 299
  }

300 301 302 303 304
  SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
  pLeaderTransfer->bytes = bytes;
  pLeaderTransfer->msgType = TDMT_SYNC_LEADER_TRANSFER;
  pLeaderTransfer->vgId = vgId;
  return 0;
305 306
}

S
Shengliang Guan 已提交
307 308 309
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
  int32_t bytes = sizeof(SyncLocalCmd);
  pMsg->pCont = rpcMallocCont(bytes);
M
Minghao Li 已提交
310
  pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
S
Shengliang Guan 已提交
311 312 313 314
  pMsg->contLen = bytes;
  if (pMsg->pCont == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
M
Minghao Li 已提交
315 316
  }

S
Shengliang Guan 已提交
317 318 319 320 321
  SyncLocalCmd* pLocalCmd = pMsg->pCont;
  pLocalCmd->bytes = bytes;
  pLocalCmd->msgType = TDMT_SYNC_LOCAL_CMD;
  pLocalCmd->vgId = vgId;
  return 0;
M
Minghao Li 已提交
322 323
}

S
Shengliang Guan 已提交
324 325 326 327 328 329 330 331 332 333
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
  switch (timerType) {
    case SYNC_TIMEOUT_PING:
      return "ping";
    case SYNC_TIMEOUT_ELECTION:
      return "elect";
    case SYNC_TIMEOUT_HEARTBEAT:
      return "heartbeat";
    default:
      return "unknown";
M
Minghao Li 已提交
334 335 336
  }
}

S
Shengliang Guan 已提交
337 338 339 340 341 342 343 344
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd) {
  switch (cmd) {
    case SYNC_LOCAL_CMD_STEP_DOWN:
      return "step-down";
    case SYNC_LOCAL_CMD_FOLLOWER_CMT:
      return "follower-commit";
    default:
      return "unknown-local-cmd";
M
Minghao Li 已提交
345
  }
S
Shengliang Guan 已提交
346
}