syncRetrieve.c 14.5 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17 18
#include <sys/inotify.h>
#include "os.h"
S
Shengliang Guan 已提交
19
#include "taoserror.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
20 21 22 23 24 25 26 27 28
#include "tlog.h"
#include "tutil.h"
#include "tglobal.h"
#include "ttimer.h"
#include "tsocket.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"

S
TD-1926  
Shengliang Guan 已提交
29 30 31 32 33 34 35 36 37 38 39
static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
  uint64_t fver, wver;
  int32_t  code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
  if (code != 0) {
    sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
    return -1;
  }

  pPeer->lastWalVer = wver;
  return code;
}
S
TD-1926  
Shengliang Guan 已提交
40

S
TD-1926  
Shengliang Guan 已提交
41 42 43
static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) {
  uint64_t fver, wver;
  int32_t  code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
S
TD-1926  
Shengliang Guan 已提交
44
  if (code != 0) {
S
TD-1926  
Shengliang Guan 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
    sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
    return true;
  }

  if (wver != pPeer->lastWalVer) {
    sDebug("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer);
    return true;
  }

  return false;
}

static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
  uint64_t fver, wver;
  int32_t  code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
  if (code != 0) {
    sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
    return -1;
  }

  pPeer->lastFileVer = fver;
  return code;
}

static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
  uint64_t fver, wver;
  int32_t  code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
  if (code != 0) {
    sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
S
Shengliang Guan 已提交
74
    pPeer->fileChanged = 1;
S
TD-1926  
Shengliang Guan 已提交
75
    return true;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77
  }

S
TD-1926  
Shengliang Guan 已提交
78 79
  if (fver != pPeer->lastFileVer) {
    sDebug("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer);
S
Shengliang Guan 已提交
80
    pPeer->fileChanged = 1;
S
TD-1926  
Shengliang Guan 已提交
81
    return true;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82 83
  }

S
Shengliang Guan 已提交
84
  pPeer->fileChanged = 0;
S
TD-1926  
Shengliang Guan 已提交
85
  return false;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86 87
}

S
Shengliang Guan 已提交
88
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
S
TD-1617  
Shengliang Guan 已提交
89
  SSyncNode *pNode = pPeer->pSyncNode;
S
Shengliang Guan 已提交
90
  SFileInfo  fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
S
TD-1926  
Shengliang Guan 已提交
91
  SFileAck   fileAck = {0};
S
TD-1926  
Shengliang Guan 已提交
92
  int32_t    code = -1;
S
TD-1617  
Shengliang Guan 已提交
93
  char       name[TSDB_FILENAME_LEN * 2] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94

S
TD-1926  
Shengliang Guan 已提交
95
  if (syncGetFileVersion(pNode, pPeer) < 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97 98 99

  while (1) {
    // retrieve file info
    fileInfo.name[0] = 0;
S
TD-1926  
Shengliang Guan 已提交
100
    fileInfo.size = 0;
101
    fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
S
TD-1617  
Shengliang Guan 已提交
102 103
                                           &fileInfo.size, &fileInfo.fversion);
    // fileInfo.size = htonl(size);
S
Shengliang Guan 已提交
104
    sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106 107

    // send the file info
    int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
S
TD-2196  
Shengliang Guan 已提交
108
    if (ret < 0) {
S
TD-1926  
Shengliang Guan 已提交
109
      code = -1;
S
TD-2196  
Shengliang Guan 已提交
110 111 112
      sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113 114

    // if no file anymore, break
S
TD-1617  
Shengliang Guan 已提交
115
    if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
S
TD-1926  
Shengliang Guan 已提交
116
      code = 0;
S
TD-1617  
Shengliang Guan 已提交
117 118
      sDebug("%s, no more files to sync", pPeer->id);
      break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
119 120 121
    }

    // wait for the ack from peer
S
TD-2196  
Shengliang Guan 已提交
122 123
    ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
    if (ret < 0) {
S
TD-1926  
Shengliang Guan 已提交
124
      code = -1;
S
TD-2196  
Shengliang Guan 已提交
125 126 127
      sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128 129 130 131 132 133

    // set the peer sync version
    pPeer->sversion = fileInfo.fversion;

    // if sync is not required, continue
    if (fileAck.sync == 0) {
S
TD-1617  
Shengliang Guan 已提交
134 135 136
      fileInfo.index++;
      sDebug("%s, %s is the same", pPeer->id, fileInfo.name);
      continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
137 138
    }

S
TD-1926  
Shengliang Guan 已提交
139 140 141
    // get the full path to file
    snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142
    // send the file to peer
S
Shengliang Guan 已提交
143
    int32_t sfd = open(name, O_RDONLY);
S
TD-2196  
Shengliang Guan 已提交
144
    if (sfd < 0) {
S
TD-1926  
Shengliang Guan 已提交
145
      code = -1;
S
TD-2196  
Shengliang Guan 已提交
146 147 148
      sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149

S
TD-1912  
Shengliang Guan 已提交
150
    ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
S
Shengliang Guan 已提交
151
    close(sfd);
S
TD-2196  
Shengliang Guan 已提交
152
    if (ret < 0) {
S
TD-1926  
Shengliang Guan 已提交
153
      code = -1;
S
TD-2196  
Shengliang Guan 已提交
154 155 156
      sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157

S
Shengliang Guan 已提交
158
    sDebug("%s, file:%s is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
S
TD-1617  
Shengliang Guan 已提交
159
    fileInfo.index++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160

S
TD-1617  
Shengliang Guan 已提交
161
    // check if processed files are modified
S
TD-1926  
Shengliang Guan 已提交
162 163 164 165
    if (syncAreFilesModified(pNode, pPeer)) {
      code = -1;
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166 167
  }

S
Shengliang Guan 已提交
168
  if (code != TSDB_CODE_SUCCESS) {
S
TD-1926  
Shengliang Guan 已提交
169
    sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171 172 173 174
  }

  return code;
}

S
TD-1926  
Shengliang Guan 已提交
175 176 177 178 179 180 181
// if only a partial record is read out, upper layer will reload the file to get a complete record
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
  int32_t ret = read(sfd, pHead, sizeof(SWalHead));
  if (ret < 0) {
    sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret);
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182

S
TD-1926  
Shengliang Guan 已提交
183
  if (ret == 0) {
S
TD-1926  
Shengliang Guan 已提交
184
    sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret);
S
TD-1926  
Shengliang Guan 已提交
185 186
    return 0;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187 188 189

  if (ret != sizeof(SWalHead)) {
    // file is not at end yet, it shall be reloaded
S
TD-1926  
Shengliang Guan 已提交
190
    sDebug("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191 192 193
    return 0;
  }

S
TD-1924  
Shengliang Guan 已提交
194 195
  assert(pHead->len <= TSDB_MAX_WAL_SIZE);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196
  ret = read(sfd, pHead->cont, pHead->len);
S
TD-1926  
Shengliang Guan 已提交
197 198 199 200
  if (ret < 0) {
    sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret);
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201 202 203

  if (ret != pHead->len) {
    // file is not at end yet, it shall be reloaded
S
TD-1926  
Shengliang Guan 已提交
204
    sDebug("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205 206 207 208
    return 0;
  }

  return sizeof(SWalHead) + pHead->len;
S
Shengliang Guan 已提交
209
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210

S
TD-1926  
Shengliang Guan 已提交
211 212 213 214
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) {
  int32_t sfd = open(name, O_RDONLY);
  if (sfd < 0) {
    sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215 216 217
    return -1;
  }

S
TD-1926  
Shengliang Guan 已提交
218 219 220 221
  int32_t code = taosLSeek(sfd, offset, SEEK_SET);
  if (code < 0) {
    sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno));
    close(sfd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222 223
    return -1;
  }
S
Shengliang Guan 已提交
224

S
TD-1926  
Shengliang Guan 已提交
225
  sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226

S
TD-2041  
Shengliang Guan 已提交
227
  SWalHead *pHead = malloc(SYNC_MAX_SIZE);
S
TD-1617  
Shengliang Guan 已提交
228
  int32_t   bytes = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229 230

  while (1) {
S
TD-1926  
Shengliang Guan 已提交
231 232 233 234 235 236 237 238 239
    code = syncReadOneWalRecord(sfd, pHead);
    if (code < 0) {
      sError("%s, failed to read one record from wal:%s", pPeer->id, name);
      break;
    }

    if (code == 0) {
      code = bytes;
      sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes);
S
Shengliang Guan 已提交
240 241
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242

S
Shengliang Guan 已提交
243
    sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
S
TD-1926  
Shengliang Guan 已提交
244 245

    int32_t wsize = code;
S
Shengliang Guan 已提交
246
    int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
S
TD-1926  
Shengliang Guan 已提交
247 248 249 250 251
    if (ret != wsize) {
      code = -1;
      sError("%s, failed to forward wal since %s, hver:%" PRIu64, pPeer->id, strerror(errno), pHead->version);
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252

S
TD-1926  
Shengliang Guan 已提交
253
    pPeer->sversion = pHead->version;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
254
    bytes += wsize;
S
TD-1617  
Shengliang Guan 已提交
255

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256
    if (pHead->version >= fversion && fversion > 0) {
S
TD-1617  
Shengliang Guan 已提交
257
      code = 0;
S
TD-1926  
Shengliang Guan 已提交
258
      sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259 260 261 262 263
      break;
    }
  }

  free(pHead);
S
TD-1617  
Shengliang Guan 已提交
264
  close(sfd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265

S
TD-1926  
Shengliang Guan 已提交
266
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
267 268
}

S
Shengliang Guan 已提交
269
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
S
TD-1617  
Shengliang Guan 已提交
270
  SSyncNode *pNode = pPeer->pSyncNode;
S
TD-1926  
Shengliang Guan 已提交
271 272 273
  int32_t    once = 0;  // last WAL has once ever been processed
  int64_t    offset = 0;
  uint64_t   fversion = 0;
S
TD-1926  
Shengliang Guan 已提交
274
  char       fname[TSDB_FILENAME_LEN * 2] = {0};  // full path to wal file
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275

S
TD-1926  
Shengliang Guan 已提交
276 277 278
  // get full path to wal file
  snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
  sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279 280

  while (1) {
S
TD-1926  
Shengliang Guan 已提交
281 282
    if (syncAreFilesModified(pNode, pPeer)) return -1;
    if (syncGetWalVersion(pNode, pPeer) < 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283

S
TD-1926  
Shengliang Guan 已提交
284 285 286 287 288
    int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset);
    if (bytes < 0) {
      sDebug("%s, failed to retrieve last wal", pPeer->id);
      return bytes;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289

S
TD-1926  
Shengliang Guan 已提交
290 291
    // check file changes
    bool walModified = syncIsWalModified(pNode, pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292

S
TD-1926  
Shengliang Guan 已提交
293 294 295 296 297
    // if file is not updated or updated once, set the fversion and sstatus
    if (!walModified || once) {
      if (fversion == 0) {
        pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;  // start to forward pkt
        fversion = nodeVersion;                   // must read data to fversion
S
TD-1926  
Shengliang Guan 已提交
298
        sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion);
S
TD-1617  
Shengliang Guan 已提交
299
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
300 301
    }

S
TD-1926  
Shengliang Guan 已提交
302 303
    // if all data up to fversion is read out, it is over
    if (pPeer->sversion >= fversion && fversion > 0) {
S
TD-1926  
Shengliang Guan 已提交
304 305
      sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes,
             pPeer->sversion);
S
TD-1926  
Shengliang Guan 已提交
306 307
      return 0;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308

S
TD-1926  
Shengliang Guan 已提交
309 310 311 312
    // if all data are read out, and no update
    if (bytes == 0 && !walModified) {
      // wal not closed, it means some data not flushed to disk, wait for a while
      usleep(10000);
S
Shengliang Guan 已提交
313
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
314

S
TD-1926  
Shengliang Guan 已提交
315 316 317 318
    // if bytes > 0, file is updated, or fversion is not reached but file still open, read again
    once = 1;
    offset += bytes;
    sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64, pPeer->id, bytes, offset);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
319 320
  }

S
TD-1926  
Shengliang Guan 已提交
321
  return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
322 323
}

S
Shengliang Guan 已提交
324
static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
S
Shengliang Guan 已提交
325
  SSyncNode * pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326 327 328
  char        fname[TSDB_FILENAME_LEN * 3];
  char        wname[TSDB_FILENAME_LEN * 2];
  int32_t     size;
S
Shengliang Guan 已提交
329
  int32_t     code = -1;
S
TD-1846  
Shengliang Guan 已提交
330
  int64_t     index = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
331 332 333 334

  while (1) {
    // retrieve wal info
    wname[0] = 0;
335
    code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
S
TD-1926  
Shengliang Guan 已提交
336 337 338 339 340
    if (code < 0) {
      sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code);
      break;
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
341
    if (wname[0] == 0) {  // no wal file
S
TD-1926  
Shengliang Guan 已提交
342 343
      code = 0;
      sDebug("%s, no wal file anymore", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
344
      break;
S
TD-1617  
Shengliang Guan 已提交
345 346 347
    }

    if (code == 0) {  // last wal
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348 349 350 351 352 353 354
      code = syncProcessLastWal(pPeer, wname, index);
      break;
    }

    // get the full path to wal file
    snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);

S
TD-1926  
Shengliang Guan 已提交
355 356 357 358 359 360 361
    // send wal file, old wal file won't be modified, even remove is ok
    struct stat fstat;
    if (stat(fname, &fstat) < 0) {
      code = -1;
      sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
362

S
TD-1926  
Shengliang Guan 已提交
363
    size = fstat.st_size;
S
TD-1617  
Shengliang Guan 已提交
364
    sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
S
TD-1926  
Shengliang Guan 已提交
365

S
Shengliang Guan 已提交
366
    int32_t sfd = open(fname, O_RDONLY);
S
TD-1926  
Shengliang Guan 已提交
367 368 369 370 371
    if (sfd < 0) {
      code = -1;
      sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
372

S
TD-1912  
Shengliang Guan 已提交
373
    code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
S
TD-1617  
Shengliang Guan 已提交
374
    close(sfd);
S
TD-1926  
Shengliang Guan 已提交
375 376 377 378
    if (code < 0) {
      sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
379

S
TD-1926  
Shengliang Guan 已提交
380 381 382 383
    if (syncAreFilesModified(pNode, pPeer)) {
      code = -1;
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
384 385 386 387
  }

  if (code == 0) {
    pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
S
TD-2157  
Shengliang Guan 已提交
388 389
    sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
390 391
    SWalHead walHead;
    memset(&walHead, 0, sizeof(walHead));
S
TD-1926  
Shengliang Guan 已提交
392
    taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
393
  } else {
S
TD-1926  
Shengliang Guan 已提交
394
    sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395 396 397 398 399
  }

  return code;
}

S
TD-2211  
Shengliang Guan 已提交
400
static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
S
Shengliang Guan 已提交
401
  SSyncNode *pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
402 403 404 405 406 407 408 409

  SFirstPkt firstPkt;
  memset(&firstPkt, 0, sizeof(firstPkt));
  firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
  firstPkt.syncHead.vgId = pNode->vgId;
  tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
  firstPkt.port = tsSyncPort;

S
TD-2211  
Shengliang Guan 已提交
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
  if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) < 0) {
    sError("%s, failed to send sync firstPkt since %s", pPeer->id, strerror(errno));
    return -1;
  }

  SFirstPktRsp firstPktRsp;
  if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) {
    sError("%s, failed to read sync firstPkt rsp since %s", pPeer->id, strerror(errno));
    return -1;
  }

  return 0;
}

static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
  sInfo("%s, start to retrieve, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
  if (syncRetrieveFirstPkt(pPeer) < 0) {
    sError("%s, failed to start retrieve", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
428 429 430
    return -1;
  }

S
TD-1617  
Shengliang Guan 已提交
431
  pPeer->sversion = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432
  pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
S
Shengliang Guan 已提交
433
  sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
S
TD-1926  
Shengliang Guan 已提交
434
  if (syncRetrieveFile(pPeer) != 0) {
S
Shengliang Guan 已提交
435
    sError("%s, failed to retrieve files", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
436 437 438 439
    return -1;
  }

  // if no files are synced, there must be wal to sync, sversion must be larger than one
S
TD-1617  
Shengliang Guan 已提交
440
  if (pPeer->sversion == 0) pPeer->sversion = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
441

S
Shengliang Guan 已提交
442
  sInfo("%s, start to retrieve wals", pPeer->id);
S
TD-1926  
Shengliang Guan 已提交
443 444 445
  int32_t code = syncRetrieveWal(pPeer);
  if (code != 0) {
    sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
446 447 448 449 450 451
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
452
void *syncRetrieveData(void *param) {
S
TD-1617  
Shengliang Guan 已提交
453 454
  SSyncPeer *pPeer = (SSyncPeer *)param;
  SSyncNode *pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
455 456
  taosBlockSIGPIPE();

S
Shengliang Guan 已提交
457 458
  if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
459 460 461 462 463
  pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
  if (pPeer->syncFd < 0) {
    sError("%s, failed to open socket to sync", pPeer->id);
  } else {
    sInfo("%s, sync tcp is setup", pPeer->id);
S
TD-1617  
Shengliang Guan 已提交
464

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
465
    if (syncRetrieveDataStepByStep(pPeer) == 0) {
S
TD-2153  
Shengliang Guan 已提交
466
      sInfo("%s, sync retrieve process is successful", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
467 468 469 470 471 472
    } else {
      sError("%s, failed to retrieve data, restart connection", pPeer->id);
      syncRestartConnection(pPeer);
    }
  }

473 474 475 476
  if (pPeer->fileChanged) {
    pPeer->numOfRetrieves++;
  } else {
    pPeer->numOfRetrieves = 0;
477
    if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
478 479 480
  }

  pPeer->fileChanged = 0;
S
Shengliang Guan 已提交
481
  taosClose(pPeer->syncFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
482 483 484 485
  syncDecPeerRef(pPeer);

  return NULL;
}