syncRestore.c 10.3 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17
#include "os.h"
S
TD-2000  
Shengliang Guan 已提交
18
#include "taoserror.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
19 20 21 22 23 24 25 26 27
#include "tlog.h"
#include "tutil.h"
#include "ttimer.h"
#include "tsocket.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"

S
TD-1617  
Shengliang Guan 已提交
28 29 30 31
static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex) {
  char       name[TSDB_FILENAME_LEN * 2] = {0};
  char       fname[TSDB_FILENAME_LEN * 3] = {0};
  uint32_t   magic;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
32
  uint64_t   fversion;
33
  int64_t    size;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
34 35 36 37 38 39 40
  uint32_t   index = sindex;
  SSyncNode *pNode = pPeer->pSyncNode;

  if (sindex < 0 || eindex < sindex) return;

  while (1) {
    name[0] = 0;
41
    magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42 43 44
    if (magic == 0) break;

    snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
S
TD-1617  
Shengliang Guan 已提交
45
    (void)remove(fname);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46 47 48 49
    sDebug("%s, %s is removed", pPeer->id, fname);

    index++;
    if (index > eindex) break;
S
TD-1617  
Shengliang Guan 已提交
50
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
51 52
}

S
Shengliang Guan 已提交
53
static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
54 55 56 57
  SSyncNode *pNode = pPeer->pSyncNode;
  SFileInfo  minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */  // master file info
  SFileInfo  sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */  // slave file info
  SFileAck   fileAck; 
S
Shengliang Guan 已提交
58
  int32_t    code = -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59 60
  char       name[TSDB_FILENAME_LEN * 2] = {0};
  uint32_t   pindex = 0;    // index in last restore
S
TD-1780  
Shengliang Guan 已提交
61
  bool       fileChanged = false;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62 63 64 65 66

  *fversion = 0;
  sinfo.index = 0;
  while (1) {
    // read file info
S
Shengliang Guan 已提交
67
    int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo));
S
TD-2196  
Shengliang Guan 已提交
68 69 70 71
    if (ret < 0) {
      sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72 73 74 75 76 77

    // if no more file from master, break;
    if (minfo.name[0] == 0 || minfo.magic == 0) {
      sDebug("%s, no more files to restore", pPeer->id);

      // remove extra files after the current index
S
TD-1617  
Shengliang Guan 已提交
78 79
      syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX);
      code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
80 81
      break;
    }
S
TD-1617  
Shengliang Guan 已提交
82

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83
    // remove extra files on slave between the current and last index
S
TD-1617  
Shengliang Guan 已提交
84
    syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85 86 87 88
    pindex = minfo.index;

    // check the file info
    sinfo = minfo;
S
add log  
Shengliang Guan 已提交
89
    sDebug("%s, get file:%s info size:%" PRId64, pPeer->id, minfo.name, minfo.size);
90
    sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size,
S
TD-1617  
Shengliang Guan 已提交
91
                                        &sinfo.fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92 93 94

    // if file not there or magic is not the same, file shall be synced
    memset(&fileAck, 0, sizeof(fileAck));
S
TD-1617  
Shengliang Guan 已提交
95
    fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97

    // send file ack
S
TD-2196  
Shengliang Guan 已提交
98 99 100 101 102
    ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
    if (ret < 0) {
      sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
      break;
    }
S
TD-1617  
Shengliang Guan 已提交
103

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104 105 106 107 108 109 110 111
    // if sync is not required, continue
    if (fileAck.sync == 0) {
      sDebug("%s, %s is the same", pPeer->id, minfo.name);
      continue;
    }

    // if sync is required, open file, receive from master, and write to file
    // get the full path to file
S
TD-1617  
Shengliang Guan 已提交
112
    minfo.name[sizeof(minfo.name) - 1] = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113 114
    snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name);

S
Shengliang Guan 已提交
115
    int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
S
TD-1617  
Shengliang Guan 已提交
116
    if (dfd < 0) {
S
TD-2196  
Shengliang Guan 已提交
117
      sError("%s, failed to open file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118 119 120 121 122 123
      break;
    }

    ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size);
    fsync(dfd);
    close(dfd);
S
TD-2196  
Shengliang Guan 已提交
124 125 126 127
    if (ret < 0) {
      sError("%s, failed to copy file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128

S
TD-1780  
Shengliang Guan 已提交
129
    fileChanged = true;
130
    sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131 132
  }

S
TD-1780  
Shengliang Guan 已提交
133
  if (code == 0 && fileChanged) {
S
TD-1617  
Shengliang Guan 已提交
134
    // data file is changed, code shall be set to 1
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
135
    *fversion = minfo.fversion;
S
TD-1617  
Shengliang Guan 已提交
136
    code = 1;
S
TD-2196  
Shengliang Guan 已提交
137
    sDebug("%s, file changed while restore file", pPeer->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138 139 140
  }

  if (code < 0) {
S
TD-2000  
Shengliang Guan 已提交
141
    sError("%s, failed to restore %s since %s", pPeer->id, name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142 143 144 145 146
  }

  return code;
}

S
Shengliang Guan 已提交
147
static int32_t syncRestoreWal(SSyncPeer *pPeer) {
S
TD-1617  
Shengliang Guan 已提交
148
  SSyncNode *pNode = pPeer->pSyncNode;
S
Shengliang Guan 已提交
149
  int32_t    ret, code = -1;
S
TD-2086  
Shengliang Guan 已提交
150
  uint64_t   lastVer = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151

S
TD-2086  
Shengliang Guan 已提交
152 153
  SWalHead *pHead = calloc(SYNC_MAX_SIZE, 1);  // size for one record
  if (pHead == NULL) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154 155 156

  while (1) {
    ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
S
TD-2196  
Shengliang Guan 已提交
157 158 159 160
    if (ret < 0) {
      sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno));
      break;
    }
S
TD-1617  
Shengliang Guan 已提交
161 162

    if (pHead->len == 0) {
S
TD-2196  
Shengliang Guan 已提交
163
      sDebug("%s, wal is synced over", pPeer->id);
S
TD-1617  
Shengliang Guan 已提交
164 165 166
      code = 0;
      break;
    }  // wal sync over
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167 168

    ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
S
TD-2196  
Shengliang Guan 已提交
169 170 171 172
    if (ret < 0) {
      sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno));
      break;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173

S
Shengliang Guan 已提交
174
    sTrace("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version);
S
TD-2153  
Shengliang Guan 已提交
175

S
TD-2046  
Shengliang Guan 已提交
176
    if (lastVer == pHead->version) {
S
TD-2153  
Shengliang Guan 已提交
177 178 179 180 181
      sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer);
      break;
    }
    lastVer = pHead->version;

182
    (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183 184
  }

S
TD-1617  
Shengliang Guan 已提交
185
  if (code < 0) {
S
TD-2000  
Shengliang Guan 已提交
186
    sError("%s, failed to restore wal from syncFd:%d since %s", pPeer->id, pPeer->syncFd, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187 188
  }

S
TD-2086  
Shengliang Guan 已提交
189
  free(pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190 191 192
  return code;
}

S
TD-1617  
Shengliang Guan 已提交
193
static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
194
  SSyncNode *pNode = pPeer->pSyncNode;
S
TD-1617  
Shengliang Guan 已提交
195
  SWalHead * pHead = (SWalHead *)offset;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196

197
  (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
198 199 200 201 202
  offset += pHead->len + sizeof(SWalHead);

  return offset;
}

S
Shengliang Guan 已提交
203
static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
S
TD-1617  
Shengliang Guan 已提交
204
  SSyncNode *  pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205
  SRecvBuffer *pRecv = pNode->pRecv;
S
Shengliang Guan 已提交
206
  int32_t      forwards = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
207 208 209 210 211 212 213 214

  sDebug("%s, number of buffered forwards:%d", pPeer->id, pRecv->forwards);

  char *offset = pRecv->buffer;
  while (forwards < pRecv->forwards) {
    offset = syncProcessOneBufferedFwd(pPeer, offset);
    forwards++;
  }
S
TD-1617  
Shengliang Guan 已提交
215

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  pthread_mutex_lock(&pNode->mutex);

  while (forwards < pRecv->forwards && pRecv->code == 0) {
    offset = syncProcessOneBufferedFwd(pPeer, offset);
    forwards++;
  }

  nodeRole = TAOS_SYNC_ROLE_SLAVE;
  sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards);

  pthread_mutex_unlock(&pNode->mutex);

  return pRecv->code;
}

S
Shengliang Guan 已提交
231
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
S
TD-1617  
Shengliang Guan 已提交
232
  SSyncNode *  pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233 234 235
  SRecvBuffer *pRecv = pNode->pRecv;

  if (pRecv == NULL) return -1;
S
Shengliang Guan 已提交
236
  int32_t len = pHead->len + sizeof(SWalHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237 238 239 240 241

  if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) {
    memcpy(pRecv->offset, pHead, len);
    pRecv->offset += len;
    pRecv->forwards++;
S
Shengliang Guan 已提交
242
    sTrace("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243 244 245 246 247 248 249 250
  } else {
    sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize);
    pRecv->code = -1;  // set error code
  }

  return pRecv->code;
}

S
Shengliang Guan 已提交
251
static void syncCloseRecvBuffer(SSyncNode *pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252
  if (pNode->pRecv) {
S
TD-1848  
Shengliang Guan 已提交
253
    tfree(pNode->pRecv->buffer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
254 255
  }

S
TD-1848  
Shengliang Guan 已提交
256
  tfree(pNode->pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257 258
}

S
Shengliang Guan 已提交
259
static int32_t syncOpenRecvBuffer(SSyncNode *pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260 261 262 263 264
  syncCloseRecvBuffer(pNode);

  SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
  if (pRecv == NULL) return -1;

S
TD-1924  
Shengliang Guan 已提交
265
  pRecv->bufferSize = SYNC_RECV_BUFFER_SIZE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279
  pRecv->buffer = malloc(pRecv->bufferSize);
  if (pRecv->buffer == NULL) {
    free(pRecv);
    return -1;
  }

  pRecv->offset = pRecv->buffer;
  pRecv->forwards = 0;

  pNode->pRecv = pRecv;

  return 0;
}

S
Shengliang Guan 已提交
280
static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281 282 283 284
  SSyncNode *pNode = pPeer->pSyncNode;
  nodeSStatus = TAOS_SYNC_STATUS_FILE;
  uint64_t fversion = 0;

S
TD-2157  
Shengliang Guan 已提交
285
  sDebug("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
S
Shengliang Guan 已提交
286
  int32_t code = syncRestoreFile(pPeer, &fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
287 288 289 290 291
  if (code < 0) {
    sError("%s, failed to restore file", pPeer->id);
    return -1;
  }

S
TD-1617  
Shengliang Guan 已提交
292
  // if code > 0, data file is changed, notify app, and pass the version
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
293
  if (code > 0 && pNode->notifyFileSynced) {
294
    if ((*pNode->notifyFileSynced)(pNode->vgId, fversion) < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308
      sError("%s, app not in ready state", pPeer->id);
      return -1;
    }
  }

  nodeVersion = fversion;

  sDebug("%s, start to restore wal", pPeer->id);
  if (syncRestoreWal(pPeer) < 0) {
    sError("%s, failed to restore wal", pPeer->id);
    return -1;
  }

  nodeSStatus = TAOS_SYNC_STATUS_CACHE;
S
TD-2157  
Shengliang Guan 已提交
309
  sDebug("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310 311 312 313 314 315 316 317
  if (syncProcessBufferedFwd(pPeer) < 0) {
    sError("%s, failed to insert buffered points", pPeer->id);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
318
void *syncRestoreData(void *param) {
S
TD-2157  
Shengliang Guan 已提交
319
  SSyncPeer *pPeer = param;
S
Shengliang Guan 已提交
320
  SSyncNode *pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
321 322 323 324

  taosBlockSIGPIPE();
  __sync_fetch_and_add(&tsSyncNum, 1);

325
  (*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
327
  if (syncOpenRecvBuffer(pNode) < 0) {
S
TD-2157  
Shengliang Guan 已提交
328 329
    sError("%s, failed to allocate recv buffer, restart connection", pPeer->id);
    syncRestartConnection(pPeer);
S
TD-1617  
Shengliang Guan 已提交
330 331
  } else {
    if (syncRestoreDataStepByStep(pPeer) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
332 333 334 335 336 337 338 339 340 341
      sInfo("%s, it is synced successfully", pPeer->id);
      nodeRole = TAOS_SYNC_ROLE_SLAVE;
      syncBroadcastStatus(pNode);
    } else {
      sError("%s, failed to restore data, restart connection", pPeer->id);
      nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
      syncRestartConnection(pPeer);
    }
  }

342
  (*pNode->notifyRole)(pNode->vgId, nodeRole);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
344
  nodeSStatus = TAOS_SYNC_STATUS_INIT;
S
TD-2157  
Shengliang Guan 已提交
345 346
  sInfo("%s, sync over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);

S
TD-1617  
Shengliang Guan 已提交
347
  taosClose(pPeer->syncFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348 349 350 351 352 353
  syncCloseRecvBuffer(pNode);
  __sync_fetch_and_sub(&tsSyncNum, 1);
  syncDecPeerRef(pPeer);

  return NULL;
}