syncRestore.c 9.4 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17 18 19 20 21 22 23 24 25 26
#include "os.h"
#include "tlog.h"
#include "tutil.h"
#include "ttimer.h"
#include "tsocket.h"
#include "tqueue.h"
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"

S
TD-1617  
Shengliang Guan 已提交
27 28 29 30
static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex) {
  char       name[TSDB_FILENAME_LEN * 2] = {0};
  char       fname[TSDB_FILENAME_LEN * 3] = {0};
  uint32_t   magic;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31
  uint64_t   fversion;
32
  int64_t    size;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
33 34 35 36 37 38 39 40 41 42 43
  uint32_t   index = sindex;
  SSyncNode *pNode = pPeer->pSyncNode;

  if (sindex < 0 || eindex < sindex) return;

  while (1) {
    name[0] = 0;
    magic = (*pNode->getFileInfo)(pNode->ahandle, name, &index, eindex, &size, &fversion);
    if (magic == 0) break;

    snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
S
TD-1617  
Shengliang Guan 已提交
44
    (void)remove(fname);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45 46 47 48
    sDebug("%s, %s is removed", pPeer->id, fname);

    index++;
    if (index > eindex) break;
S
TD-1617  
Shengliang Guan 已提交
49
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50 51
}

S
Shengliang Guan 已提交
52
static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
53 54 55 56
  SSyncNode *pNode = pPeer->pSyncNode;
  SFileInfo  minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */  // master file info
  SFileInfo  sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */  // slave file info
  SFileAck   fileAck; 
S
Shengliang Guan 已提交
57
  int32_t    code = -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58 59
  char       name[TSDB_FILENAME_LEN * 2] = {0};
  uint32_t   pindex = 0;    // index in last restore
S
TD-1780  
Shengliang Guan 已提交
60
  bool       fileChanged = false;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61 62 63 64 65

  *fversion = 0;
  sinfo.index = 0;
  while (1) {
    // read file info
S
Shengliang Guan 已提交
66
    int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo));
S
TD-1617  
Shengliang Guan 已提交
67
    if (ret < 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68 69 70 71 72 73

    // if no more file from master, break;
    if (minfo.name[0] == 0 || minfo.magic == 0) {
      sDebug("%s, no more files to restore", pPeer->id);

      // remove extra files after the current index
S
TD-1617  
Shengliang Guan 已提交
74 75
      syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX);
      code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77
      break;
    }
S
TD-1617  
Shengliang Guan 已提交
78

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79
    // remove extra files on slave between the current and last index
S
TD-1617  
Shengliang Guan 已提交
80
    syncRemoveExtraFile(pPeer, pindex + 1, minfo.index - 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81 82 83 84 85
    pindex = minfo.index;

    // check the file info
    sinfo = minfo;
    sDebug("%s, get file info:%s", pPeer->id, minfo.name);
S
TD-1617  
Shengliang Guan 已提交
86 87
    sinfo.magic = (*pNode->getFileInfo)(pNode->ahandle, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size,
                                        &sinfo.fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
88 89 90

    // if file not there or magic is not the same, file shall be synced
    memset(&fileAck, 0, sizeof(fileAck));
S
TD-1617  
Shengliang Guan 已提交
91
    fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92 93 94

    // send file ack
    ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck));
S
TD-1617  
Shengliang Guan 已提交
95 96
    if (ret < 0) break;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98 99 100 101 102 103 104
    // if sync is not required, continue
    if (fileAck.sync == 0) {
      sDebug("%s, %s is the same", pPeer->id, minfo.name);
      continue;
    }

    // if sync is required, open file, receive from master, and write to file
    // get the full path to file
S
TD-1617  
Shengliang Guan 已提交
105
    minfo.name[sizeof(minfo.name) - 1] = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106 107
    snprintf(name, sizeof(name), "%s/%s", pNode->path, minfo.name);

S
Shengliang Guan 已提交
108
    int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
S
TD-1617  
Shengliang Guan 已提交
109
    if (dfd < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110 111 112 113 114 115 116
      sError("%s, failed to open file:%s", pPeer->id, name);
      break;
    }

    ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size);
    fsync(dfd);
    close(dfd);
S
TD-1617  
Shengliang Guan 已提交
117
    if (ret < 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118

S
TD-1780  
Shengliang Guan 已提交
119
    fileChanged = true;
120
    sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
121 122
  }

S
TD-1780  
Shengliang Guan 已提交
123
  if (code == 0 && fileChanged) {
S
TD-1617  
Shengliang Guan 已提交
124
    // data file is changed, code shall be set to 1
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
125
    *fversion = minfo.fversion;
S
TD-1617  
Shengliang Guan 已提交
126
    code = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127 128 129 130 131 132 133 134 135
  }

  if (code < 0) {
    sError("%s, failed to restore %s(%s)", pPeer->id, name, strerror(errno));
  }

  return code;
}

S
Shengliang Guan 已提交
136
static int32_t syncRestoreWal(SSyncPeer *pPeer) {
S
TD-1617  
Shengliang Guan 已提交
137
  SSyncNode *pNode = pPeer->pSyncNode;
S
Shengliang Guan 已提交
138
  int32_t    ret, code = -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139

S
TD-1924  
Shengliang Guan 已提交
140
  void *buffer = calloc(SYNC_MAX_SIZE, 1);  // size for one record
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141 142 143
  if (buffer == NULL) return -1;

  SWalHead *pHead = (SWalHead *)buffer;
S
TD-2153  
Shengliang Guan 已提交
144
  uint64_t lastVer = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145 146 147

  while (1) {
    ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
S
TD-1617  
Shengliang Guan 已提交
148 149 150 151 152 153
    if (ret < 0) break;

    if (pHead->len == 0) {
      code = 0;
      break;
    }  // wal sync over
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154 155

    ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
S
TD-1617  
Shengliang Guan 已提交
156
    if (ret < 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157

S
Shengliang Guan 已提交
158
    sTrace("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version);
S
TD-2153  
Shengliang Guan 已提交
159

S
TD-2046  
Shengliang Guan 已提交
160
    if (lastVer == pHead->version) {
S
TD-2153  
Shengliang Guan 已提交
161 162 163 164 165
      sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer);
      break;
    }
    lastVer = pHead->version;

S
TD-1918  
Shengliang Guan 已提交
166
    (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167 168
  }

S
TD-1617  
Shengliang Guan 已提交
169
  if (code < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171 172 173 174 175 176
    sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno));
  }

  free(buffer);
  return code;
}

S
TD-1617  
Shengliang Guan 已提交
177
static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178
  SSyncNode *pNode = pPeer->pSyncNode;
S
TD-1617  
Shengliang Guan 已提交
179
  SWalHead * pHead = (SWalHead *)offset;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180

S
TD-1918  
Shengliang Guan 已提交
181
  (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182 183 184 185 186
  offset += pHead->len + sizeof(SWalHead);

  return offset;
}

S
Shengliang Guan 已提交
187
static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
S
TD-1617  
Shengliang Guan 已提交
188
  SSyncNode *  pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189
  SRecvBuffer *pRecv = pNode->pRecv;
S
Shengliang Guan 已提交
190
  int32_t      forwards = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191 192 193 194 195 196 197 198

  sDebug("%s, number of buffered forwards:%d", pPeer->id, pRecv->forwards);

  char *offset = pRecv->buffer;
  while (forwards < pRecv->forwards) {
    offset = syncProcessOneBufferedFwd(pPeer, offset);
    forwards++;
  }
S
TD-1617  
Shengliang Guan 已提交
199

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
  pthread_mutex_lock(&pNode->mutex);

  while (forwards < pRecv->forwards && pRecv->code == 0) {
    offset = syncProcessOneBufferedFwd(pPeer, offset);
    forwards++;
  }

  nodeRole = TAOS_SYNC_ROLE_SLAVE;
  sDebug("%s, finish processing buffered fwds:%d", pPeer->id, forwards);

  pthread_mutex_unlock(&pNode->mutex);

  return pRecv->code;
}

S
Shengliang Guan 已提交
215
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
S
TD-1617  
Shengliang Guan 已提交
216
  SSyncNode *  pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217 218 219
  SRecvBuffer *pRecv = pNode->pRecv;

  if (pRecv == NULL) return -1;
S
Shengliang Guan 已提交
220
  int32_t len = pHead->len + sizeof(SWalHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221 222 223 224 225

  if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) {
    memcpy(pRecv->offset, pHead, len);
    pRecv->offset += len;
    pRecv->forwards++;
S
Shengliang Guan 已提交
226
    sTrace("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227 228 229 230 231 232 233 234
  } else {
    sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize);
    pRecv->code = -1;  // set error code
  }

  return pRecv->code;
}

S
Shengliang Guan 已提交
235
static void syncCloseRecvBuffer(SSyncNode *pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
236
  if (pNode->pRecv) {
S
TD-1848  
Shengliang Guan 已提交
237
    tfree(pNode->pRecv->buffer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
238 239
  }

S
TD-1848  
Shengliang Guan 已提交
240
  tfree(pNode->pRecv);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241 242
}

S
Shengliang Guan 已提交
243
static int32_t syncOpenRecvBuffer(SSyncNode *pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245 246 247 248
  syncCloseRecvBuffer(pNode);

  SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
  if (pRecv == NULL) return -1;

S
TD-1924  
Shengliang Guan 已提交
249
  pRecv->bufferSize = SYNC_RECV_BUFFER_SIZE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250 251 252 253 254 255 256 257 258 259 260 261 262 263
  pRecv->buffer = malloc(pRecv->bufferSize);
  if (pRecv->buffer == NULL) {
    free(pRecv);
    return -1;
  }

  pRecv->offset = pRecv->buffer;
  pRecv->forwards = 0;

  pNode->pRecv = pRecv;

  return 0;
}

S
Shengliang Guan 已提交
264
static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265 266 267 268 269
  SSyncNode *pNode = pPeer->pSyncNode;
  nodeSStatus = TAOS_SYNC_STATUS_FILE;
  uint64_t fversion = 0;

  sDebug("%s, start to restore file", pPeer->id);
S
Shengliang Guan 已提交
270
  int32_t code = syncRestoreFile(pPeer, &fversion);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271 272 273 274 275
  if (code < 0) {
    sError("%s, failed to restore file", pPeer->id);
    return -1;
  }

S
TD-1617  
Shengliang Guan 已提交
276
  // if code > 0, data file is changed, notify app, and pass the version
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277
  if (code > 0 && pNode->notifyFileSynced) {
S
TD-1617  
Shengliang Guan 已提交
278
    if ((*pNode->notifyFileSynced)(pNode->ahandle, fversion) < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
      sError("%s, app not in ready state", pPeer->id);
      return -1;
    }
  }

  nodeVersion = fversion;

  sDebug("%s, start to restore wal", pPeer->id);
  if (syncRestoreWal(pPeer) < 0) {
    sError("%s, failed to restore wal", pPeer->id);
    return -1;
  }

  nodeSStatus = TAOS_SYNC_STATUS_CACHE;
  sDebug("%s, start to insert buffered points", pPeer->id);
  if (syncProcessBufferedFwd(pPeer) < 0) {
    sError("%s, failed to insert buffered points", pPeer->id);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
302
void *syncRestoreData(void *param) {
S
TD-2157  
Shengliang Guan 已提交
303
  SSyncPeer *pPeer = param;
S
Shengliang Guan 已提交
304
  SSyncNode *pNode = pPeer->pSyncNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
305 306 307 308

  taosBlockSIGPIPE();
  __sync_fetch_and_add(&tsSyncNum, 1);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
309 310
  (*pNode->notifyRole)(pNode->ahandle, TAOS_SYNC_ROLE_SYNCING);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
311
  if (syncOpenRecvBuffer(pNode) < 0) {
S
TD-2157  
Shengliang Guan 已提交
312 313
    sError("%s, failed to allocate recv buffer, restart connection", pPeer->id);
    syncRestartConnection(pPeer);
S
TD-1617  
Shengliang Guan 已提交
314 315
  } else {
    if (syncRestoreDataStepByStep(pPeer) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
316 317 318 319 320 321 322 323 324 325
      sInfo("%s, it is synced successfully", pPeer->id);
      nodeRole = TAOS_SYNC_ROLE_SLAVE;
      syncBroadcastStatus(pNode);
    } else {
      sError("%s, failed to restore data, restart connection", pPeer->id);
      nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
      syncRestartConnection(pPeer);
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326 327
  (*pNode->notifyRole)(pNode->ahandle, nodeRole);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
328
  nodeSStatus = TAOS_SYNC_STATUS_INIT;
S
TD-1617  
Shengliang Guan 已提交
329
  taosClose(pPeer->syncFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
330 331 332 333 334 335
  syncCloseRecvBuffer(pNode);
  __sync_fetch_and_sub(&tsSyncNum, 1);
  syncDecPeerRef(pPeer);

  return NULL;
}