syncMessage.c 8.3 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncMessage.h"
#include "syncRaft.h"
M
Minghao Li 已提交
18
#include "syncUtil.h"
M
Minghao Li 已提交
19
#include "tcoding.h"
M
Minghao Li 已提交
20

M
Minghao Li 已提交
21 22
void onMessage(SRaft* pRaft, void* pMsg) {}

M
Minghao Li 已提交
23
// ---- message process SyncPing----
M
Minghao Li 已提交
24 25
SyncPing* syncPingBuild(uint32_t dataLen) {
  uint32_t  bytes = SYNC_PING_FIX_LEN + dataLen;
M
Minghao Li 已提交
26 27 28 29 30
  SyncPing* pMsg = malloc(bytes);
  memset(pMsg, 0, bytes);
  pMsg->bytes = bytes;
  pMsg->msgType = SYNC_PING;
  pMsg->dataLen = dataLen;
M
Minghao Li 已提交
31 32
}

M
Minghao Li 已提交
33 34 35
void syncPingDestroy(SyncPing* pMsg) {
  if (pMsg != NULL) {
    free(pMsg);
M
Minghao Li 已提交
36 37 38
  }
}

M
Minghao Li 已提交
39 40 41
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) {
  assert(pMsg->bytes <= bufLen);
  memcpy(buf, pMsg, pMsg->bytes);
M
Minghao Li 已提交
42 43
}

M
Minghao Li 已提交
44 45 46 47
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
  memcpy(pMsg, buf, len);
  assert(len == pMsg->bytes);
  assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen);
M
Minghao Li 已提交
48 49
}

M
Minghao Li 已提交
50
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
M
Minghao Li 已提交
51
  memset(pRpcMsg, 0, sizeof(*pRpcMsg));
M
Minghao Li 已提交
52 53
  pRpcMsg->msgType = pMsg->msgType;
  pRpcMsg->contLen = pMsg->bytes;
M
Minghao Li 已提交
54
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
M
Minghao Li 已提交
55
  syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
M
Minghao Li 已提交
56 57
}

M
Minghao Li 已提交
58 59
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
  syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
M
Minghao Li 已提交
60 61
}

M
Minghao Li 已提交
62
cJSON* syncPing2Json(const SyncPing* pMsg) {
M
Minghao Li 已提交
63
  cJSON* pRoot = cJSON_CreateObject();
M
Minghao Li 已提交
64 65
  cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
  cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
M
Minghao Li 已提交
66

M
Minghao Li 已提交
67
  cJSON* pSrcId = cJSON_CreateObject();
M
Minghao Li 已提交
68
  cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr);
M
Minghao Li 已提交
69 70 71 72 73 74 75 76 77
  {
    uint64_t u64 = pMsg->srcId.addr;
    cJSON*   pTmp = pSrcId;
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(u64, host, sizeof(host), &port);
    cJSON_AddStringToObject(pTmp, "addr_host", host);
    cJSON_AddNumberToObject(pTmp, "addr_port", port);
  }
M
Minghao Li 已提交
78
  cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
M
Minghao Li 已提交
79 80 81
  cJSON_AddItemToObject(pRoot, "srcId", pSrcId);

  cJSON* pDestId = cJSON_CreateObject();
M
Minghao Li 已提交
82
  cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
M
Minghao Li 已提交
83 84 85 86 87 88 89 90 91
  {
    uint64_t u64 = pMsg->destId.addr;
    cJSON*   pTmp = pDestId;
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(u64, host, sizeof(host), &port);
    cJSON_AddStringToObject(pTmp, "addr_host", host);
    cJSON_AddNumberToObject(pTmp, "addr_port", port);
  }
M
Minghao Li 已提交
92
  cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
M
Minghao Li 已提交
93
  cJSON_AddItemToObject(pRoot, "destId", pDestId);
M
Minghao Li 已提交
94

M
Minghao Li 已提交
95 96
  cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
  cJSON_AddStringToObject(pRoot, "data", pMsg->data);
M
Minghao Li 已提交
97

M
Minghao Li 已提交
98 99 100 101 102
  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
  return pJson;
}

M
Minghao Li 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) {
  uint32_t  dataLen = strlen(str) + 1;
  SyncPing* pMsg = syncPingBuild(dataLen);
  pMsg->srcId = *srcId;
  pMsg->destId = *destId;
  snprintf(pMsg->data, pMsg->dataLen, "%s", str);
  return pMsg;
}

SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId) {
  SyncPing* pMsg = syncPingBuild2(srcId, destId, "ping");
  return pMsg;
}

M
Minghao Li 已提交
117 118 119 120 121 122
// ---- message process SyncPingReply----
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
  uint32_t       bytes = SYNC_PING_REPLY_FIX_LEN + dataLen;
  SyncPingReply* pMsg = malloc(bytes);
  memset(pMsg, 0, bytes);
  pMsg->bytes = bytes;
M
Minghao Li 已提交
123
  pMsg->msgType = SYNC_PING_REPLY;
M
Minghao Li 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
  pMsg->dataLen = dataLen;
}

void syncPingReplyDestroy(SyncPingReply* pMsg) {
  if (pMsg != NULL) {
    free(pMsg);
  }
}

void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) {
  assert(pMsg->bytes <= bufLen);
  memcpy(buf, pMsg, pMsg->bytes);
}

void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) {
  memcpy(pMsg, buf, len);
  assert(len == pMsg->bytes);
  assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen);
}

void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) {
M
Minghao Li 已提交
145
  memset(pRpcMsg, 0, sizeof(*pRpcMsg));
M
Minghao Li 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
  pRpcMsg->msgType = pMsg->msgType;
  pRpcMsg->contLen = pMsg->bytes;
  pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
  syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}

void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
  syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}

cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
  cJSON* pRoot = cJSON_CreateObject();
  cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
  cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);

  cJSON* pSrcId = cJSON_CreateObject();
  cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr);
M
Minghao Li 已提交
163 164 165 166 167 168 169 170 171
  {
    uint64_t u64 = pMsg->srcId.addr;
    cJSON*   pTmp = pSrcId;
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(u64, host, sizeof(host), &port);
    cJSON_AddStringToObject(pTmp, "addr_host", host);
    cJSON_AddNumberToObject(pTmp, "addr_port", port);
  }
M
Minghao Li 已提交
172 173 174 175 176
  cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
  cJSON_AddItemToObject(pRoot, "srcId", pSrcId);

  cJSON* pDestId = cJSON_CreateObject();
  cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
M
Minghao Li 已提交
177 178 179 180 181 182 183 184 185
  {
    uint64_t u64 = pMsg->destId.addr;
    cJSON*   pTmp = pDestId;
    char     host[128];
    uint16_t port;
    syncUtilU642Addr(u64, host, sizeof(host), &port);
    cJSON_AddStringToObject(pTmp, "addr_host", host);
    cJSON_AddNumberToObject(pTmp, "addr_port", port);
  }
M
Minghao Li 已提交
186
  cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
M
Minghao Li 已提交
187
  cJSON_AddItemToObject(pRoot, "destId", pDestId);
M
Minghao Li 已提交
188 189 190 191 192 193 194

  cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
  cJSON_AddStringToObject(pRoot, "data", pMsg->data);

  cJSON* pJson = cJSON_CreateObject();
  cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot);
  return pJson;
M
Minghao Li 已提交
195 196
}

M
Minghao Li 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209 210
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) {
  uint32_t       dataLen = strlen(str) + 1;
  SyncPingReply* pMsg = syncPingReplyBuild(dataLen);
  pMsg->srcId = *srcId;
  pMsg->destId = *destId;
  snprintf(pMsg->data, pMsg->dataLen, "%s", str);
  return pMsg;
}

SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) {
  SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, "pang");
  return pMsg;
}

M
Minghao Li 已提交
211
#if 0
M
Minghao Li 已提交
212 213
void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) {
  *bufLen = sizeof(SyncPing) + pMsg->dataLen;
M
Minghao Li 已提交
214 215 216 217 218
  *ppBuf = (char*)malloc(*bufLen);
  void*    pStart = *ppBuf;
  uint32_t allBytes = *bufLen;

  int len = 0;
M
Minghao Li 已提交
219
  len = taosEncodeFixedU32(&pStart, pMsg->msgType);
M
Minghao Li 已提交
220 221 222 223
  allBytes -= len;
  assert(len > 0);
  pStart += len;

M
Minghao Li 已提交
224
  len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr);
M
Minghao Li 已提交
225 226 227 228
  allBytes -= len;
  assert(len > 0);
  pStart += len;

M
Minghao Li 已提交
229
  len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId);
M
Minghao Li 已提交
230 231 232 233
  allBytes -= len;
  assert(len > 0);
  pStart += len;

M
Minghao Li 已提交
234
  len = taosEncodeFixedU64(&pStart, pMsg->destId.addr);
M
Minghao Li 已提交
235 236 237 238
  allBytes -= len;
  assert(len > 0);
  pStart += len;

M
Minghao Li 已提交
239
  len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId);
M
Minghao Li 已提交
240 241 242 243
  allBytes -= len;
  assert(len > 0);
  pStart += len;

M
Minghao Li 已提交
244
  len = taosEncodeFixedU32(&pStart, pMsg->dataLen);
M
Minghao Li 已提交
245 246 247 248
  allBytes -= len;
  assert(len > 0);
  pStart += len;

M
Minghao Li 已提交
249 250
  memcpy(pStart, pMsg->data, pMsg->dataLen);
  allBytes -= pMsg->dataLen;
M
Minghao Li 已提交
251 252 253 254
  assert(allBytes == 0);
}


M
Minghao Li 已提交
255
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
M
Minghao Li 已提交
256 257 258 259 260 261
  void*    pStart = (void*)buf;
  uint64_t u64;
  int32_t  i32;
  uint32_t u32;

  pStart = taosDecodeFixedU64(pStart, &u64);
M
Minghao Li 已提交
262
  pMsg->msgType = u64;
M
Minghao Li 已提交
263 264

  pStart = taosDecodeFixedU64(pStart, &u64);
M
Minghao Li 已提交
265
  pMsg->srcId.addr = u64;
M
Minghao Li 已提交
266 267

  pStart = taosDecodeFixedI32(pStart, &i32);
M
Minghao Li 已提交
268
  pMsg->srcId.vgId = i32;
M
Minghao Li 已提交
269 270

  pStart = taosDecodeFixedU64(pStart, &u64);
M
Minghao Li 已提交
271
  pMsg->destId.addr = u64;
M
Minghao Li 已提交
272 273

  pStart = taosDecodeFixedI32(pStart, &i32);
M
Minghao Li 已提交
274
  pMsg->destId.vgId = i32;
M
Minghao Li 已提交
275 276

  pStart = taosDecodeFixedU32(pStart, &u32);
M
Minghao Li 已提交
277
  pMsg->dataLen = u32;
M
Minghao Li 已提交
278
}
M
Minghao Li 已提交
279
#endif