syncRetrieve.c 15.8 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17 18
#include <sys/inotify.h>
#include "os.h"
S
Shengliang Guan 已提交
19
#include "taoserror.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
20 21 22 23 24 25 26 27 28
#include "tlog.h"
#include "tutil.h"
#include "tglobal.h"
#include "ttimer.h"
#include "tsocket.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"

S
TD-1926  
Shengliang Guan 已提交
29 30 31 32 33 34 35 36 37 38 39
static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
  uint64_t fver, wver;
  int32_t  code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
  if (code != 0) {
    sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
    return -1;
  }

  pPeer->lastWalVer = wver;
  return code;
}
S
TD-1926  
Shengliang Guan 已提交
40

S
TD-1926  
Shengliang Guan 已提交
41 42 43
static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) {
  uint64_t fver, wver;
  int32_t  code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
S
TD-1926  
Shengliang Guan 已提交
44
  if (code != 0) {
S
TD-1926  
Shengliang Guan 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
    sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
    return true;
  }

  if (wver != pPeer->lastWalVer) {
    sDebug("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer);
    return true;
  }

  return false;
}

static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
  uint64_t fver, wver;
  int32_t  code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
  if (code != 0) {
S
TD-2415  
Shengliang Guan 已提交
61
    sDebug("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
S
TD-1926  
Shengliang Guan 已提交
62 63 64 65 66 67 68 69 70 71 72 73
    return -1;
  }

  pPeer->lastFileVer = fver;
  return code;
}

static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
  uint64_t fver, wver;
  int32_t  code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
  if (code != 0) {
    sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
S
Shengliang Guan 已提交
74
    pPeer->fileChanged = 1;
S
TD-1926  
Shengliang Guan 已提交
75
    return true;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77
  }

S
TD-1926  
Shengliang Guan 已提交
78 79
  if (fver != pPeer->lastFileVer) {
    sDebug("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer);
S
Shengliang Guan 已提交
80
    pPeer->fileChanged = 1;
S
TD-1926  
Shengliang Guan 已提交
81
    return true;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82 83
  }

S
Shengliang Guan 已提交
84
  pPeer->fileChanged = 0;
S
TD-1926  
Shengliang Guan 已提交
85
  return false;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86 87
}

S
Shengliang Guan 已提交
88
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
S
TD-1617  
Shengliang Guan 已提交
89
  SSyncNode *pNode = pPeer->pSyncNode;
S
Shengliang Guan 已提交
90
  SFileInfo  fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
S
Shengliang Guan 已提交
91
  SFileAck   fileAck; memset(&fileAck, 0, sizeof(SFileAck));
S
TD-1926  
Shengliang Guan 已提交
92
  int32_t    code = -1;
S
TD-1617  
Shengliang Guan 已提交
93
  char       name[TSDB_FILENAME_LEN * 2] = {0};
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
94

S
TD-2415  
Shengliang Guan 已提交
95 96 97 98
  if (syncGetFileVersion(pNode, pPeer) < 0) {
    pPeer->fileChanged = 1;
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99 100 101 102

  while (1) {
    // retrieve file info
    fileInfo.name[0] = 0;
S
TD-1926  
Shengliang Guan 已提交
103
    fileInfo.size = 0;
104
    fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
S
TD-1617  
Shengliang Guan 已提交
105
                                           &fileInfo.size, &fileInfo.fversion);
S
TD-2428  
Shengliang Guan 已提交
106
    syncBuildFileInfo(&fileInfo, pNode->vgId);
S
TD-2656  
Shengliang Guan 已提交
107
    sDebug("%s, file:%s info is sent, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id, fileInfo.name,
S
TD-2500  
Shengliang Guan 已提交
108
           fileInfo.index, fileInfo.size, fileInfo.fversion, fileInfo.magic);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109 110

    // send the file info
S
TD-2428  
Shengliang Guan 已提交
111 112
    int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo));
    if (ret != sizeof(SFileInfo)) {
S
TD-1926  
Shengliang Guan 已提交
113
      code = -1;
S
TD-2196  
Shengliang Guan 已提交
114 115 116
      sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117 118

    // if no file anymore, break
S
TD-1617  
Shengliang Guan 已提交
119
    if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
S
TD-1926  
Shengliang Guan 已提交
120
      code = 0;
S
TD-1617  
Shengliang Guan 已提交
121 122
      sDebug("%s, no more files to sync", pPeer->id);
      break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124 125
    }

    // wait for the ack from peer
S
TD-2415  
Shengliang Guan 已提交
126 127
    ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
    if (ret != sizeof(SFileAck)) {
S
TD-1926  
Shengliang Guan 已提交
128
      code = -1;
S
TD-2196  
Shengliang Guan 已提交
129 130 131
      sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132

S
TD-2428  
Shengliang Guan 已提交
133 134 135 136 137 138 139
    ret = syncCheckHead((SSyncHead*)(&fileAck));
    if (ret != 0) {
      code = -1;
      sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret));
      break;
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
140 141 142 143 144
    // set the peer sync version
    pPeer->sversion = fileInfo.fversion;

    // if sync is not required, continue
    if (fileAck.sync == 0) {
S
TD-1617  
Shengliang Guan 已提交
145
      fileInfo.index++;
S
TD-2640  
Shengliang Guan 已提交
146
      sDebug("%s, %s is the same, fver:%" PRIu64, pPeer->id, fileInfo.name, fileInfo.fversion);
S
TD-1617  
Shengliang Guan 已提交
147
      continue;
S
TD-2500  
Shengliang Guan 已提交
148
    } else {
S
TD-2640  
Shengliang Guan 已提交
149
      sDebug("%s, %s will be sent, fver:%" PRIu64, pPeer->id, fileInfo.name, fileInfo.fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150 151
    }

S
TD-1926  
Shengliang Guan 已提交
152 153 154
    // get the full path to file
    snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155
    // send the file to peer
S
Shengliang Guan 已提交
156
    int32_t sfd = open(name, O_RDONLY);
S
TD-2196  
Shengliang Guan 已提交
157
    if (sfd < 0) {
S
TD-1926  
Shengliang Guan 已提交
158
      code = -1;
S
TD-2196  
Shengliang Guan 已提交
159 160 161
      sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162

S
TD-1912  
Shengliang Guan 已提交
163
    ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
S
Shengliang Guan 已提交
164
    close(sfd);
S
TD-2196  
Shengliang Guan 已提交
165
    if (ret < 0) {
S
TD-1926  
Shengliang Guan 已提交
166
      code = -1;
S
TD-2196  
Shengliang Guan 已提交
167 168 169
      sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170

S
Shengliang Guan 已提交
171
    sDebug("%s, file:%s is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
S
TD-1617  
Shengliang Guan 已提交
172
    fileInfo.index++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173

S
TD-1617  
Shengliang Guan 已提交
174
    // check if processed files are modified
S
TD-1926  
Shengliang Guan 已提交
175 176 177 178
    if (syncAreFilesModified(pNode, pPeer)) {
      code = -1;
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179 180
  }

S
Shengliang Guan 已提交
181
  if (code != TSDB_CODE_SUCCESS) {
S
TD-1926  
Shengliang Guan 已提交
182
    sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184 185 186 187
  }

  return code;
}

S
TD-1926  
Shengliang Guan 已提交
188 189 190 191 192 193 194
// if only a partial record is read out, upper layer will reload the file to get a complete record
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
  int32_t ret = read(sfd, pHead, sizeof(SWalHead));
  if (ret < 0) {
    sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret);
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195

S
TD-1926  
Shengliang Guan 已提交
196
  if (ret == 0) {
S
TD-2616  
Shengliang Guan 已提交
197
    sDebug("sfd:%d, read to the end of file, ret:%d", sfd, ret);
S
TD-1926  
Shengliang Guan 已提交
198 199
    return 0;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200 201 202

  if (ret != sizeof(SWalHead)) {
    // file is not at end yet, it shall be reloaded
S
TD-1926  
Shengliang Guan 已提交
203
    sDebug("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204 205 206
    return 0;
  }

S
TD-1924  
Shengliang Guan 已提交
207 208
  assert(pHead->len <= TSDB_MAX_WAL_SIZE);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209
  ret = read(sfd, pHead->cont, pHead->len);
S
TD-1926  
Shengliang Guan 已提交
210 211 212 213
  if (ret < 0) {
    sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret);
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214 215 216

  if (ret != pHead->len) {
    // file is not at end yet, it shall be reloaded
S
TD-1926  
Shengliang Guan 已提交
217
    sDebug("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218 219 220 221
    return 0;
  }

  return sizeof(SWalHead) + pHead->len;
S
Shengliang Guan 已提交
222
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223

S
TD-1926  
Shengliang Guan 已提交
224 225 226 227
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) {
  int32_t sfd = open(name, O_RDONLY);
  if (sfd < 0) {
    sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228 229 230
    return -1;
  }

S
TD-1926  
Shengliang Guan 已提交
231 232 233 234
  int32_t code = taosLSeek(sfd, offset, SEEK_SET);
  if (code < 0) {
    sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno));
    close(sfd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235 236
    return -1;
  }
S
Shengliang Guan 已提交
237

S
TD-1926  
Shengliang Guan 已提交
238
  sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239

S
TD-2041  
Shengliang Guan 已提交
240
  SWalHead *pHead = malloc(SYNC_MAX_SIZE);
S
TD-1617  
Shengliang Guan 已提交
241
  int32_t   bytes = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242 243

  while (1) {
S
TD-1926  
Shengliang Guan 已提交
244 245 246 247 248 249 250 251 252
    code = syncReadOneWalRecord(sfd, pHead);
    if (code < 0) {
      sError("%s, failed to read one record from wal:%s", pPeer->id, name);
      break;
    }

    if (code == 0) {
      code = bytes;
      sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes);
S
Shengliang Guan 已提交
253 254
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
255

S
TD-2616  
Shengliang Guan 已提交
256
    sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
S
TD-1926  
Shengliang Guan 已提交
257 258

    int32_t wsize = code;
S
Shengliang Guan 已提交
259
    int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
S
TD-1926  
Shengliang Guan 已提交
260 261 262 263 264
    if (ret != wsize) {
      code = -1;
      sError("%s, failed to forward wal since %s, hver:%" PRIu64, pPeer->id, strerror(errno), pHead->version);
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265

S
TD-1926  
Shengliang Guan 已提交
266
    pPeer->sversion = pHead->version;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
267
    bytes += wsize;
S
TD-1617  
Shengliang Guan 已提交
268

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269
    if (pHead->version >= fversion && fversion > 0) {
S
TD-1617  
Shengliang Guan 已提交
270
      code = 0;
S
TD-1926  
Shengliang Guan 已提交
271
      sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272 273 274 275 276
      break;
    }
  }

  free(pHead);
S
TD-1617  
Shengliang Guan 已提交
277
  close(sfd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278

S
TD-1926  
Shengliang Guan 已提交
279
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280 281
}

S
Shengliang Guan 已提交
282
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
S
TD-1617  
Shengliang Guan 已提交
283
  SSyncNode *pNode = pPeer->pSyncNode;
S
TD-1926  
Shengliang Guan 已提交
284 285 286
  int32_t    once = 0;  // last WAL has once ever been processed
  int64_t    offset = 0;
  uint64_t   fversion = 0;
S
TD-1926  
Shengliang Guan 已提交
287
  char       fname[TSDB_FILENAME_LEN * 2] = {0};  // full path to wal file
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288

S
TD-1926  
Shengliang Guan 已提交
289 290 291
  // get full path to wal file
  snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
  sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292 293

  while (1) {
S
TD-1926  
Shengliang Guan 已提交
294 295
    if (syncAreFilesModified(pNode, pPeer)) return -1;
    if (syncGetWalVersion(pNode, pPeer) < 0) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296

S
TD-1926  
Shengliang Guan 已提交
297 298 299 300 301
    int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset);
    if (bytes < 0) {
      sDebug("%s, failed to retrieve last wal", pPeer->id);
      return bytes;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
302

S
TD-1926  
Shengliang Guan 已提交
303 304
    // check file changes
    bool walModified = syncIsWalModified(pNode, pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
305

S
TD-1926  
Shengliang Guan 已提交
306 307 308 309 310
    // if file is not updated or updated once, set the fversion and sstatus
    if (!walModified || once) {
      if (fversion == 0) {
        pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;  // start to forward pkt
        fversion = nodeVersion;                   // must read data to fversion
S
TD-1926  
Shengliang Guan 已提交
311
        sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion);
S
TD-1617  
Shengliang Guan 已提交
312
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
313 314
    }

S
TD-1926  
Shengliang Guan 已提交
315 316
    // if all data up to fversion is read out, it is over
    if (pPeer->sversion >= fversion && fversion > 0) {
S
TD-1926  
Shengliang Guan 已提交
317 318
      sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes,
             pPeer->sversion);
S
TD-1926  
Shengliang Guan 已提交
319 320
      return 0;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
321

S
TD-1926  
Shengliang Guan 已提交
322 323 324 325
    // if all data are read out, and no update
    if (bytes == 0 && !walModified) {
      // wal not closed, it means some data not flushed to disk, wait for a while
      usleep(10000);
S
Shengliang Guan 已提交
326
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
327

S
TD-1926  
Shengliang Guan 已提交
328 329 330
    // if bytes > 0, file is updated, or fversion is not reached but file still open, read again
    once = 1;
    offset += bytes;
S
TD-2640  
Shengliang Guan 已提交
331 332
    sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id,
           bytes, offset, pPeer->sversion, fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
333 334
  }

S
TD-1926  
Shengliang Guan 已提交
335
  return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
336 337
}

S
Shengliang Guan 已提交
338
static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
S
Shengliang Guan 已提交
339
  SSyncNode * pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
340 341 342
  char        fname[TSDB_FILENAME_LEN * 3];
  char        wname[TSDB_FILENAME_LEN * 2];
  int32_t     size;
S
Shengliang Guan 已提交
343
  int32_t     code = -1;
S
TD-1846  
Shengliang Guan 已提交
344
  int64_t     index = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
345 346 347 348

  while (1) {
    // retrieve wal info
    wname[0] = 0;
349
    code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
S
TD-1926  
Shengliang Guan 已提交
350 351 352 353 354
    if (code < 0) {
      sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code);
      break;
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
355
    if (wname[0] == 0) {  // no wal file
S
TD-1926  
Shengliang Guan 已提交
356 357
      code = 0;
      sDebug("%s, no wal file anymore", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358
      break;
S
TD-1617  
Shengliang Guan 已提交
359 360 361
    }

    if (code == 0) {  // last wal
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
362 363 364 365 366 367 368
      code = syncProcessLastWal(pPeer, wname, index);
      break;
    }

    // get the full path to wal file
    snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);

S
TD-1926  
Shengliang Guan 已提交
369 370 371 372 373 374 375
    // send wal file, old wal file won't be modified, even remove is ok
    struct stat fstat;
    if (stat(fname, &fstat) < 0) {
      code = -1;
      sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
376

S
TD-1926  
Shengliang Guan 已提交
377
    size = fstat.st_size;
S
TD-1617  
Shengliang Guan 已提交
378
    sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
S
TD-1926  
Shengliang Guan 已提交
379

S
Shengliang Guan 已提交
380
    int32_t sfd = open(fname, O_RDONLY);
S
TD-1926  
Shengliang Guan 已提交
381 382 383 384 385
    if (sfd < 0) {
      code = -1;
      sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386

S
TD-1912  
Shengliang Guan 已提交
387
    code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
S
TD-1617  
Shengliang Guan 已提交
388
    close(sfd);
S
TD-1926  
Shengliang Guan 已提交
389 390 391 392
    if (code < 0) {
      sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
393

S
TD-1926  
Shengliang Guan 已提交
394 395 396 397
    if (syncAreFilesModified(pNode, pPeer)) {
      code = -1;
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
398 399 400 401 402
  }

  if (code == 0) {
    SWalHead walHead;
    memset(&walHead, 0, sizeof(walHead));
S
TD-2415  
Shengliang Guan 已提交
403 404 405 406 407 408 409
    if (taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)) == sizeof(walHead)) {
      pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
      sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
    } else {
      sError("%s, failed to send last wal record since %s", pPeer->id, strerror(errno));
      code = -1;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
410
  } else {
S
TD-1926  
Shengliang Guan 已提交
411
    sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
412 413 414 415 416
  }

  return code;
}

S
TD-2211  
Shengliang Guan 已提交
417
static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
S
Shengliang Guan 已提交
418
  SSyncNode *pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
419

S
TD-2428  
Shengliang Guan 已提交
420
  SSyncMsg msg;
S
TD-2428  
Shengliang Guan 已提交
421
  syncBuildSyncDataMsg(&msg, pNode->vgId);
S
TD-2428  
Shengliang Guan 已提交
422 423 424

  if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
    sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
S
TD-2211  
Shengliang Guan 已提交
425 426
    return -1;
  }
S
TD-2428  
Shengliang Guan 已提交
427
  sDebug("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId);
S
TD-2211  
Shengliang Guan 已提交
428

S
TD-2428  
Shengliang Guan 已提交
429 430 431
  SSyncRsp rsp;
  if (taosReadMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
    sError("%s, failed to read sync-data rsp since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
S
TD-2211  
Shengliang Guan 已提交
432 433 434
    return -1;
  }

S
TD-2428  
Shengliang Guan 已提交
435
  sDebug("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId);
S
TD-2211  
Shengliang Guan 已提交
436 437 438 439 440 441 442
  return 0;
}

static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
  sInfo("%s, start to retrieve, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
  if (syncRetrieveFirstPkt(pPeer) < 0) {
    sError("%s, failed to start retrieve", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
443 444 445
    return -1;
  }

S
TD-1617  
Shengliang Guan 已提交
446
  pPeer->sversion = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
447
  pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
S
Shengliang Guan 已提交
448
  sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
S
TD-1926  
Shengliang Guan 已提交
449
  if (syncRetrieveFile(pPeer) != 0) {
S
Shengliang Guan 已提交
450
    sError("%s, failed to retrieve files", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
451 452 453 454
    return -1;
  }

  // if no files are synced, there must be wal to sync, sversion must be larger than one
S
TD-1617  
Shengliang Guan 已提交
455
  if (pPeer->sversion == 0) pPeer->sversion = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
456

S
Shengliang Guan 已提交
457
  sInfo("%s, start to retrieve wals", pPeer->id);
S
TD-1926  
Shengliang Guan 已提交
458 459 460
  int32_t code = syncRetrieveWal(pPeer);
  if (code != 0) {
    sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
461 462 463 464 465 466
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
467
void *syncRetrieveData(void *param) {
S
TD-1927  
Shengliang Guan 已提交
468 469
  int64_t    rid = (int64_t)param;
  SSyncPeer *pPeer = syncAcquirePeer(rid);
S
TD-2616  
Shengliang Guan 已提交
470 471 472 473
  if (pPeer == NULL) {
    sError("failed to retrieve data, invalid peer rid:%" PRId64, rid);
    return NULL;
  }
S
TD-1927  
Shengliang Guan 已提交
474

S
TD-1617  
Shengliang Guan 已提交
475
  SSyncNode *pNode = pPeer->pSyncNode;
S
TD-2616  
Shengliang Guan 已提交
476

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
477
  taosBlockSIGPIPE();
S
TD-2616  
Shengliang Guan 已提交
478
  sInfo("%s, start to retrieve data, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
479

S
Shengliang Guan 已提交
480 481
  if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
482 483 484 485 486
  pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
  if (pPeer->syncFd < 0) {
    sError("%s, failed to open socket to sync", pPeer->id);
  } else {
    sInfo("%s, sync tcp is setup", pPeer->id);
S
TD-1617  
Shengliang Guan 已提交
487

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
488
    if (syncRetrieveDataStepByStep(pPeer) == 0) {
S
TD-2153  
Shengliang Guan 已提交
489
      sInfo("%s, sync retrieve process is successful", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
490 491 492 493 494 495
    } else {
      sError("%s, failed to retrieve data, restart connection", pPeer->id);
      syncRestartConnection(pPeer);
    }
  }

496 497 498 499
  if (pPeer->fileChanged) {
    pPeer->numOfRetrieves++;
  } else {
    pPeer->numOfRetrieves = 0;
500
    if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0);
501 502 503
  }

  pPeer->fileChanged = 0;
S
Shengliang Guan 已提交
504
  taosClose(pPeer->syncFd);
S
TD-2616  
Shengliang Guan 已提交
505 506

  // The ref is obtained in both the create thread and the current thread, so it is released twice
S
Shengliang Guan 已提交
507 508
  sInfo("%s, sync retrieve data over, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);

S
TD-2616  
Shengliang Guan 已提交
509
  syncReleasePeer(pPeer);
S
TD-1927  
Shengliang Guan 已提交
510
  syncReleasePeer(pPeer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
511 512 513

  return NULL;
}