syncIO.c 13.0 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "syncIO.h"
H
Haojun Liao 已提交
17
#include <tdatablock.h>
M
Minghao Li 已提交
18
#include "os.h"
M
Minghao Li 已提交
19
#include "syncMessage.h"
M
Minghao Li 已提交
20
#include "syncUtil.h"
M
Minghao Li 已提交
21 22 23 24
#include "tglobal.h"
#include "ttimer.h"
#include "tutil.h"

M
Minghao Li 已提交
25 26
SSyncIO *gSyncIO = NULL;

M
Minghao Li 已提交
27
// local function ------------
M
sync io  
Minghao Li 已提交
28 29
static SSyncIO *syncIOCreate(char *host, uint16_t port);
static int32_t  syncIODestroy(SSyncIO *io);
M
Minghao Li 已提交
30 31
static int32_t  syncIOStartInternal(SSyncIO *io);
static int32_t  syncIOStopInternal(SSyncIO *io);
M
sync io  
Minghao Li 已提交
32

M
Minghao Li 已提交
33
static void *  syncIOConsumerFunc(void *param);
M
Minghao Li 已提交
34 35 36 37 38 39 40 41 42 43
static void    syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void    syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);

static int32_t syncIOStartQ(SSyncIO *io);
static int32_t syncIOStopQ(SSyncIO *io);
static int32_t syncIOStartPing(SSyncIO *io);
static int32_t syncIOStopPing(SSyncIO *io);
static void    syncIOTickQ(void *param, void *tmrId);
static void    syncIOTickPing(void *param, void *tmrId);
M
Minghao Li 已提交
44 45
// ----------------------------

M
sync io  
Minghao Li 已提交
46 47
// public function ------------
int32_t syncIOStart(char *host, uint16_t port) {
M
Minghao Li 已提交
48
  int32_t ret = 0;
M
sync io  
Minghao Li 已提交
49
  gSyncIO = syncIOCreate(host, port);
M
Minghao Li 已提交
50 51
  assert(gSyncIO != NULL);

wafwerar's avatar
wafwerar 已提交
52
  taosSeedRand(taosGetTimestampSec());
M
Minghao Li 已提交
53
  ret = syncIOStartInternal(gSyncIO);
M
Minghao Li 已提交
54 55
  assert(ret == 0);

M
Minghao Li 已提交
56 57
  sTrace("syncIOStart ok, gSyncIO:%p", gSyncIO);
  return ret;
M
Minghao Li 已提交
58
}
M
Minghao Li 已提交
59

M
sync io  
Minghao Li 已提交
60 61 62
int32_t syncIOStop() {
  int32_t ret = syncIOStopInternal(gSyncIO);
  assert(ret == 0);
M
Minghao Li 已提交
63

M
sync io  
Minghao Li 已提交
64 65
  ret = syncIODestroy(gSyncIO);
  assert(ret == 0);
M
Minghao Li 已提交
66 67 68
  return ret;
}

S
Shengliang Guan 已提交
69
int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
M
Minghao Li 已提交
70 71
  assert(pEpSet->inUse == 0);
  assert(pEpSet->numOfEps == 1);
M
Minghao Li 已提交
72

M
Minghao Li 已提交
73
  int32_t ret = 0;
M
Minghao Li 已提交
74 75 76 77 78 79 80 81 82
  {
    syncUtilMsgNtoH(pMsg->pCont);

    char logBuf[256];
    snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port);
    syncRpcMsgLog2(logBuf, pMsg);

    syncUtilMsgHtoN(pMsg->pCont);
  }
M
Minghao Li 已提交
83

S
Shengliang Guan 已提交
84 85
  pMsg->info.handle = NULL;
  pMsg->info.noResp = 1;
S
Shengliang Guan 已提交
86
  rpcSendRequest(gSyncIO->clientRpc, pEpSet, pMsg, NULL);
M
Minghao Li 已提交
87
  return ret;
M
Minghao Li 已提交
88 89
}

S
Shengliang Guan 已提交
90
int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
91 92
  int32_t ret = 0;
  char    logBuf[128];
M
Minghao Li 已提交
93
  syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg);
M
Minghao Li 已提交
94

M
Minghao Li 已提交
95
  SRpcMsg *pTemp;
96
  pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
M
Minghao Li 已提交
97 98
  memcpy(pTemp, pMsg, sizeof(SRpcMsg));

S
Shengliang Guan 已提交
99
  STaosQueue *pMsgQ = gSyncIO->pMsgQ;
M
Minghao Li 已提交
100 101
  taosWriteQitem(pMsgQ, pTemp);

M
Minghao Li 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
  return ret;
}

int32_t syncIOQTimerStart() {
  int32_t ret = syncIOStartQ(gSyncIO);
  assert(ret == 0);
  return ret;
}

int32_t syncIOQTimerStop() {
  int32_t ret = syncIOStopQ(gSyncIO);
  assert(ret == 0);
  return ret;
}

int32_t syncIOPingTimerStart() {
  int32_t ret = syncIOStartPing(gSyncIO);
  assert(ret == 0);
  return ret;
}

int32_t syncIOPingTimerStop() {
  int32_t ret = syncIOStopPing(gSyncIO);
  assert(ret == 0);
  return ret;
M
Minghao Li 已提交
127 128
}

M
sync io  
Minghao Li 已提交
129
// local function ------------
M
Minghao Li 已提交
130
static SSyncIO *syncIOCreate(char *host, uint16_t port) {
wafwerar's avatar
wafwerar 已提交
131
  SSyncIO *io = (SSyncIO *)taosMemoryMalloc(sizeof(SSyncIO));
M
Minghao Li 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
  memset(io, 0, sizeof(*io));

  io->pMsgQ = taosOpenQueue();
  io->pQset = taosOpenQset();
  taosAddIntoQset(io->pQset, io->pMsgQ, NULL);

  io->myAddr.inUse = 0;
  io->myAddr.numOfEps = 0;
  addEpIntoEpSet(&io->myAddr, host, port);

  io->qTimerMS = TICK_Q_TIMER_MS;
  io->pingTimerMS = TICK_Ping_TIMER_MS;

  return io;
}

static int32_t syncIODestroy(SSyncIO *io) {
  int32_t ret = 0;
  int8_t  start = atomic_load_8(&io->isStart);
  assert(start == 0);

  if (io->serverRpc != NULL) {
    rpcClose(io->serverRpc);
    io->serverRpc = NULL;
  }

  if (io->clientRpc != NULL) {
    rpcClose(io->clientRpc);
    io->clientRpc = NULL;
  }

  taosCloseQueue(io->pMsgQ);
  taosCloseQset(io->pQset);

  return ret;
}

M
sync io  
Minghao Li 已提交
169
static int32_t syncIOStartInternal(SSyncIO *io) {
M
Minghao Li 已提交
170
  int32_t ret = 0;
M
Minghao Li 已提交
171 172
  taosBlockSIGPIPE();

M
sync io  
Minghao Li 已提交
173
  rpcInit();
M
Minghao Li 已提交
174 175 176 177 178 179 180 181

  // cient rpc init
  {
    SRpcInit rpcInit;
    memset(&rpcInit, 0, sizeof(rpcInit));
    rpcInit.localPort = 0;
    rpcInit.label = "SYNC-IO-CLIENT";
    rpcInit.numOfThreads = 1;
M
sync io  
Minghao Li 已提交
182
    rpcInit.cfp = syncIOProcessReply;
M
Minghao Li 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
    rpcInit.sessions = 100;
    rpcInit.idleTime = 100;
    rpcInit.user = "sync-io";
    rpcInit.connType = TAOS_CONN_CLIENT;

    io->clientRpc = rpcOpen(&rpcInit);
    if (io->clientRpc == NULL) {
      sError("failed to initialize RPC");
      return -1;
    }
  }

  // server rpc init
  {
    SRpcInit rpcInit;
    memset(&rpcInit, 0, sizeof(rpcInit));
M
Minghao Li 已提交
199
    snprintf(rpcInit.localFqdn, sizeof(rpcInit.localFqdn), "%s", "127.0.0.1");
M
sync io  
Minghao Li 已提交
200
    rpcInit.localPort = io->myAddr.eps[0].port;
M
Minghao Li 已提交
201 202
    rpcInit.label = "SYNC-IO-SERVER";
    rpcInit.numOfThreads = 1;
M
sync io  
Minghao Li 已提交
203
    rpcInit.cfp = syncIOProcessRequest;
M
Minghao Li 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217
    rpcInit.sessions = 1000;
    rpcInit.idleTime = 2 * 1500;
    rpcInit.parent = io;
    rpcInit.connType = TAOS_CONN_SERVER;

    void *pRpc = rpcOpen(&rpcInit);
    if (pRpc == NULL) {
      sError("failed to start RPC server");
      return -1;
    }
  }

  // start consumer thread
  {
wafwerar's avatar
wafwerar 已提交
218
    if (taosThreadCreate(&io->consumerTid, NULL, syncIOConsumerFunc, io) != 0) {
M
Minghao Li 已提交
219 220 221 222 223 224 225
      sError("failed to create sync consumer thread since %s", strerror(errno));
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  // start tmr thread
M
Minghao Li 已提交
226
  io->timerMgr = taosTmrInit(1000, 50, 10000, "SYNC-IO");
M
Minghao Li 已提交
227

M
Minghao Li 已提交
228 229
  atomic_store_8(&io->isStart, 1);
  return ret;
M
Minghao Li 已提交
230 231
}

M
sync io  
Minghao Li 已提交
232
static int32_t syncIOStopInternal(SSyncIO *io) {
M
Minghao Li 已提交
233
  int32_t ret = 0;
M
Minghao Li 已提交
234
  atomic_store_8(&io->isStart, 0);
wafwerar's avatar
wafwerar 已提交
235
  taosThreadJoin(io->consumerTid, NULL);
236
  taosThreadClear(&io->consumerTid);
M
Minghao Li 已提交
237 238
  taosTmrCleanUp(io->timerMgr);
  return ret;
M
sync io  
Minghao Li 已提交
239 240 241
}

static void *syncIOConsumerFunc(void *param) {
M
Minghao Li 已提交
242
  SSyncIO *  io = param;
M
sync io  
Minghao Li 已提交
243
  STaosQall *qall;
M
Minghao Li 已提交
244
  SRpcMsg *  pRpcMsg, rpcMsg;
M
sync io  
Minghao Li 已提交
245 246 247 248 249
  qall = taosAllocateQall();

  while (1) {
    int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL);
    sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs);
M
Minghao Li 已提交
250 251 252
    if (numOfMsgs <= 0) {
      break;
    }
M
sync io  
Minghao Li 已提交
253 254 255

    for (int i = 0; i < numOfMsgs; ++i) {
      taosGetQitem(qall, (void **)&pRpcMsg);
M
Minghao Li 已提交
256
      syncRpcMsgLog2((char *)"==syncIOConsumerFunc==", pRpcMsg);
M
Minghao Li 已提交
257

M
Minghao Li 已提交
258
      // use switch case instead of if else
M
Minghao Li 已提交
259
      if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
M
sync io  
Minghao Li 已提交
260
        if (io->FpOnSyncPing != NULL) {
M
Minghao Li 已提交
261 262 263 264
          SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
          assert(pSyncMsg != NULL);
          io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
          syncPingDestroy(pSyncMsg);
M
sync io  
Minghao Li 已提交
265
        }
M
Minghao Li 已提交
266

M
Minghao Li 已提交
267
      } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
M
Minghao Li 已提交
268
        if (io->FpOnSyncPingReply != NULL) {
M
Minghao Li 已提交
269
          SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
M
Minghao Li 已提交
270
          assert(pSyncMsg != NULL);
M
Minghao Li 已提交
271
          io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
M
Minghao Li 已提交
272
          syncPingReplyDestroy(pSyncMsg);
M
Minghao Li 已提交
273
        }
M
Minghao Li 已提交
274

M
Minghao Li 已提交
275
      } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
M
Minghao Li 已提交
276 277
        if (io->FpOnSyncClientRequest != NULL) {
          SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
M
Minghao Li 已提交
278
          assert(pSyncMsg != NULL);
M
Minghao Li 已提交
279 280 281 282
          io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
          syncClientRequestDestroy(pSyncMsg);
        }

M
Minghao Li 已提交
283
      } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
M
Minghao Li 已提交
284
        if (io->FpOnSyncRequestVote != NULL) {
M
Minghao Li 已提交
285
          SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
M
Minghao Li 已提交
286
          assert(pSyncMsg != NULL);
M
Minghao Li 已提交
287 288 289 290
          io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
          syncRequestVoteDestroy(pSyncMsg);
        }

M
Minghao Li 已提交
291
      } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
M
Minghao Li 已提交
292
        if (io->FpOnSyncRequestVoteReply != NULL) {
M
Minghao Li 已提交
293
          SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
M
Minghao Li 已提交
294
          assert(pSyncMsg != NULL);
M
Minghao Li 已提交
295 296 297 298
          io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
          syncRequestVoteReplyDestroy(pSyncMsg);
        }

M
Minghao Li 已提交
299
      } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
M
Minghao Li 已提交
300
        if (io->FpOnSyncAppendEntries != NULL) {
M
Minghao Li 已提交
301
          SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
M
Minghao Li 已提交
302
          assert(pSyncMsg != NULL);
M
Minghao Li 已提交
303 304 305 306
          io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
          syncAppendEntriesDestroy(pSyncMsg);
        }

M
Minghao Li 已提交
307
      } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
M
Minghao Li 已提交
308
        if (io->FpOnSyncAppendEntriesReply != NULL) {
M
Minghao Li 已提交
309
          SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
M
Minghao Li 已提交
310
          assert(pSyncMsg != NULL);
M
Minghao Li 已提交
311 312 313 314
          io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
          syncAppendEntriesReplyDestroy(pSyncMsg);
        }

M
Minghao Li 已提交
315
      } else if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
M
Minghao Li 已提交
316
        if (io->FpOnSyncTimeout != NULL) {
M
Minghao Li 已提交
317
          SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
M
Minghao Li 已提交
318
          assert(pSyncMsg != NULL);
M
Minghao Li 已提交
319 320 321 322
          io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
          syncTimeoutDestroy(pSyncMsg);
        }
      } else {
M
Minghao Li 已提交
323
        sTrace("unknown msgType:%d, no operator", pRpcMsg->msgType);
M
sync io  
Minghao Li 已提交
324 325 326 327 328 329 330 331
      }
    }

    taosResetQitems(qall);
    for (int i = 0; i < numOfMsgs; ++i) {
      taosGetQitem(qall, (void **)&pRpcMsg);
      rpcFreeCont(pRpcMsg->pCont);

M
Minghao Li 已提交
332 333 334 335 336 337 338 339 340 341 342
      /*
            if (pRpcMsg->handle != NULL) {
              int msgSize = 32;
              memset(&rpcMsg, 0, sizeof(rpcMsg));
              rpcMsg.msgType = SYNC_RESPONSE;
              rpcMsg.pCont = rpcMallocCont(msgSize);
              rpcMsg.contLen = msgSize;
              snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply");
              rpcMsg.handle = pRpcMsg->handle;
              rpcMsg.code = 0;

M
Minghao Li 已提交
343
              syncRpcMsgLog2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg);
M
Minghao Li 已提交
344 345 346
              rpcSendResponse(&rpcMsg);
            }
      */
M
sync io  
Minghao Li 已提交
347 348 349

      taosFreeQitem(pRpcMsg);
    }
M
Minghao Li 已提交
350 351
  }

M
sync io  
Minghao Li 已提交
352 353 354 355 356
  taosFreeQall(qall);
  return NULL;
}

static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
M
Minghao Li 已提交
357 358 359
  syncUtilMsgNtoH(pMsg->pCont);

  syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg);
M
sync io  
Minghao Li 已提交
360 361
  SSyncIO *io = pParent;
  SRpcMsg *pTemp;
362
  pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
M
sync io  
Minghao Li 已提交
363 364 365 366 367
  memcpy(pTemp, pMsg, sizeof(SRpcMsg));
  taosWriteQitem(io->pMsgQ, pTemp);
}

static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
M
Minghao Li 已提交
368
  if (pMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) {
M
Minghao Li 已提交
369 370
    sTrace("==syncIOProcessReply==");
  } else {
M
Minghao Li 已提交
371
    syncRpcMsgLog2((char *)"==syncIOProcessReply==", pMsg);
M
Minghao Li 已提交
372
  }
M
sync io  
Minghao Li 已提交
373 374 375
  rpcFreeCont(pMsg->pCont);
}

M
Minghao Li 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
  // app shall retrieve the auth info based on meterID from DB or a data file
  // demo code here only for simple demo
  int32_t ret = 0;
  return ret;
}

static int32_t syncIOStartQ(SSyncIO *io) {
  int32_t ret = 0;
  taosTmrReset(syncIOTickQ, io->qTimerMS, io, io->timerMgr, &io->qTimer);
  return ret;
}

static int32_t syncIOStopQ(SSyncIO *io) {
  int32_t ret = 0;
  taosTmrStop(io->qTimer);
  io->qTimer = NULL;
  return ret;
}

static int32_t syncIOStartPing(SSyncIO *io) {
  int32_t ret = 0;
  taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer);
  return ret;
}

static int32_t syncIOStopPing(SSyncIO *io) {
  int32_t ret = 0;
  taosTmrStop(io->pingTimer);
  io->pingTimer = NULL;
  return ret;
M
Minghao Li 已提交
407
}
M
sync io  
Minghao Li 已提交
408

M
Minghao Li 已提交
409
static void syncIOTickQ(void *param, void *tmrId) {
M
sync io  
Minghao Li 已提交
410 411
  SSyncIO *io = (SSyncIO *)param;

M
Minghao Li 已提交
412 413 414 415 416
  SRaftId srcId, destId;
  srcId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port);
  srcId.vgId = -1;
  destId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port);
  destId.vgId = -1;
M
Minghao Li 已提交
417
  SyncPingReply *pMsg = syncPingReplyBuild2(&srcId, &destId, -1, "syncIOTickQ");
M
sync io  
Minghao Li 已提交
418

M
Minghao Li 已提交
419 420
  SRpcMsg rpcMsg;
  syncPingReply2RpcMsg(pMsg, &rpcMsg);
M
sync io  
Minghao Li 已提交
421
  SRpcMsg *pTemp;
422
  pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
M
sync io  
Minghao Li 已提交
423
  memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
M
Minghao Li 已提交
424
  syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg);
M
sync io  
Minghao Li 已提交
425
  taosWriteQitem(io->pMsgQ, pTemp);
M
Minghao Li 已提交
426 427 428
  syncPingReplyDestroy(pMsg);

  taosTmrReset(syncIOTickQ, io->qTimerMS, io, io->timerMgr, &io->qTimer);
M
sync io  
Minghao Li 已提交
429 430
}

M
Minghao Li 已提交
431 432 433 434 435 436 437 438
static void syncIOTickPing(void *param, void *tmrId) {
  SSyncIO *io = (SSyncIO *)param;

  SRaftId srcId, destId;
  srcId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port);
  srcId.vgId = -1;
  destId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port);
  destId.vgId = -1;
M
Minghao Li 已提交
439
  SyncPing *pMsg = syncPingBuild2(&srcId, &destId, -1, "syncIOTickPing");
M
Minghao Li 已提交
440 441 442 443
  // SyncPing *pMsg = syncPingBuild3(&srcId, &destId);

  SRpcMsg rpcMsg;
  syncPing2RpcMsg(pMsg, &rpcMsg);
M
Minghao Li 已提交
444
  syncRpcMsgLog2((char *)"==syncIOTickPing==", &rpcMsg);
M
Minghao Li 已提交
445 446 447 448
  rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL);
  syncPingDestroy(pMsg);

  taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer);
dengyihao's avatar
dengyihao 已提交
449
}