walWrite.c 18.9 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18
#include "os.h"
S
Shengliang Guan 已提交
19
#include "taoserror.h"
20
#include "tchecksum.h"
S
TD-1895  
Shengliang Guan 已提交
21
#include "tfile.h"
S
Shengliang Guan 已提交
22
#include "walInt.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23

L
Liu Jicong 已提交
24

L
Liu Jicong 已提交
25
#if 0
S
TD-1882  
Shengliang Guan 已提交
26
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27

S
Shengliang Guan 已提交
28
int32_t walRenew(void *handle) {
29
  if (handle == NULL) return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30

S
TD-1846  
Shengliang Guan 已提交
31 32
  SWal *  pWal = handle;
  int32_t code = 0;
33

L
Liu Jicong 已提交
34 35 36 37
  /*if (pWal->stop) {*/
    /*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/
    /*return 0;*/
  /*}*/
S
TD-1894  
Shengliang Guan 已提交
38

39 40
  pthread_mutex_lock(&pWal->mutex);

L
Liu Jicong 已提交
41 42 43
  if (tfValid(pWal->logTfd)) {
    tfClose(pWal->logTfd);
    wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
44 45
  }

L
Liu Jicong 已提交
46 47 48 49 50 51
  /*if (pWal->keep == TAOS_WAL_KEEP) {*/
    /*pWal->fileId = 0;*/
  /*} else {*/
    /*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/
    /*pWal->fileId++;*/
  /*}*/
52

L
Liu Jicong 已提交
53 54
  snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
  pWal->logTfd = tfOpenCreateWrite(pWal->logName);
55

L
Liu Jicong 已提交
56
  if (!tfValid(pWal->logTfd)) {
S
TD-1846  
Shengliang Guan 已提交
57
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
58
    wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
59
  } else {
L
Liu Jicong 已提交
60
    wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName);
S
TD-1846  
Shengliang Guan 已提交
61
  }
62

S
TD-1949  
Shengliang Guan 已提交
63 64 65 66 67
  pthread_mutex_unlock(&pWal->mutex);

  return code;
}

S
TD-1949  
Shengliang Guan 已提交
68
void walRemoveOneOldFile(void *handle) {
S
TD-1949  
Shengliang Guan 已提交
69 70
  SWal *pWal = handle;
  if (pWal == NULL) return;
L
Liu Jicong 已提交
71
  /*if (pWal->keep == TAOS_WAL_KEEP) return;*/
L
Liu Jicong 已提交
72
  if (!tfValid(pWal->logTfd)) return;
S
TD-1949  
Shengliang Guan 已提交
73 74 75 76 77

  pthread_mutex_lock(&pWal->mutex);

  // remove the oldest wal file
  int64_t oldFileId = -1;
L
Liu Jicong 已提交
78
  if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) {
S
TD-1949  
Shengliang Guan 已提交
79 80 81 82 83 84 85
    char walName[WAL_FILE_LEN] = {0};
    snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);

    if (remove(walName) < 0) {
      wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
    } else {
      wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86
    }
S
TD-1652  
Shengliang Guan 已提交
87 88
  }

89
  pthread_mutex_unlock(&pWal->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90 91
}

S
TD-1949  
Shengliang Guan 已提交
92 93 94 95 96
void walRemoveAllOldFiles(void *handle) {
  if (handle == NULL) return;

  SWal *  pWal = handle;
  int64_t fileId = -1;
S
TD-2087  
Shengliang Guan 已提交
97 98

  pthread_mutex_lock(&pWal->mutex);
S
Shengliang Guan 已提交
99
  
L
Liu Jicong 已提交
100 101
  tfClose(pWal->logTfd);
  wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName);
S
Shengliang Guan 已提交
102

S
TD-1949  
Shengliang Guan 已提交
103
  while (walGetNextFile(pWal, &fileId) >= 0) {
L
Liu Jicong 已提交
104
    snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
S
TD-1949  
Shengliang Guan 已提交
105

L
Liu Jicong 已提交
106 107
    if (remove(pWal->logName) < 0) {
      wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno));
S
TD-1949  
Shengliang Guan 已提交
108
    } else {
L
Liu Jicong 已提交
109
      wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName);
S
TD-1949  
Shengliang Guan 已提交
110 111
    }
  }
S
TD-2087  
Shengliang Guan 已提交
112
  pthread_mutex_unlock(&pWal->mutex);
S
TD-1949  
Shengliang Guan 已提交
113
}
L
Liu Jicong 已提交
114
#endif
115

L
Liu Jicong 已提交
116
int32_t walCommit(SWal *pWal, int64_t ver) {
L
Liu Jicong 已提交
117 118 119
  ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
  ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
  if(ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
L
Liu Jicong 已提交
120 121
    return -1;
  }
L
Liu Jicong 已提交
122
  pWal->vers.commitVer = ver;
L
Liu Jicong 已提交
123 124 125 126 127 128
  return 0;
}

int32_t walRollback(SWal *pWal, int64_t ver) {
  int code;
  char fnameStr[WAL_FILE_LEN];
L
Liu Jicong 已提交
129
  if(ver == pWal->vers.lastVer) {
L
Liu Jicong 已提交
130 131
    return 0;
  }
L
Liu Jicong 已提交
132
  if(ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
L
Liu Jicong 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
    return -1;
  }
  pthread_mutex_lock(&pWal->mutex);

  //find correct file
  if(ver < walGetLastFileFirstVer(pWal)) {
    //close current files
    tfClose(pWal->writeIdxTfd);
    tfClose(pWal->writeLogTfd);
    //open old files
    code = walChangeFile(pWal, ver);
    if(code != 0) {
      return -1;
    }

    //delete files
    int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
    for(int i = pWal->writeCur; i < fileSetSize; i++) {
      walBuildLogName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
      remove(fnameStr);
      walBuildIdxName(pWal, ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
      remove(fnameStr);
    }
    //pop from fileInfoSet
    taosArraySetSize(pWal->fileInfoSet, pWal->writeCur + 1);
  }

  walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
  int64_t idxTfd = tfOpenReadWrite(fnameStr);

  //change to deserialize function

  if(idxTfd < 0) {
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  int idxOff = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
  code = tfLseek(idxTfd, idxOff, SEEK_SET);
  if(code < 0) {
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  //read idx file and get log file pos
  //TODO:change to deserialize function
  WalIdxEntry entry;
  if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  ASSERT(entry.ver == ver);

  walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
  int64_t logTfd = tfOpenReadWrite(fnameStr);
  if(logTfd < 0) {
    //TODO
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  code = tfLseek(logTfd, entry.offset, SEEK_SET);
  if(code < 0) {
    //TODO
    pthread_mutex_unlock(&pWal->mutex);
    return -1;
  }
  //validate offset
  SWalHead head;
  ASSERT(tfValid(logTfd));
  int size = tfRead(logTfd, &head, sizeof(SWalHead));
  if(size != sizeof(SWalHead)) {
    return -1;
  }
  code = walValidHeadCksum(&head);

  ASSERT(code == 0);
  if(code != 0) {
    return -1;
  }
  if(head.head.version != ver) {
    //TODO
    return -1;
  }
  //truncate old files
  code = tfFtruncate(logTfd, entry.offset);
  if(code < 0) {
    return -1;
  }
  code = tfFtruncate(idxTfd, idxOff);
  if(code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
223
  pWal->vers.lastVer = ver - 1;
L
Liu Jicong 已提交
224 225 226 227 228 229 230
  ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
  ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;

  //unlock
  pthread_mutex_unlock(&pWal->mutex);
  return 0;
}
L
Liu Jicong 已提交
231

L
Liu Jicong 已提交
232
int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
L
Liu Jicong 已提交
233
  pWal->vers.verInSnapshotting = ver;
L
Liu Jicong 已提交
234
  //check file rolling
L
Liu Jicong 已提交
235
  if(pWal->cfg.retentionPeriod == 0) {
L
Liu Jicong 已提交
236 237 238 239 240 241 242
    walRoll(pWal);
  }

  return 0;
}

int32_t walEndTakeSnapshot(SWal *pWal) {
L
Liu Jicong 已提交
243
  int64_t ver = pWal->vers.verInSnapshotting;
L
Liu Jicong 已提交
244 245
  if(ver == -1) return -1;

L
Liu Jicong 已提交
246
  pWal->vers.snapshotVer = ver;
L
Liu Jicong 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259
  int ts = taosGetTimestampSec();

  int deleteCnt = 0;
  int64_t newTotSize = pWal->totSize;
  WalFileInfo tmp;
  tmp.firstVer = ver;
  //find files safe to delete
  WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
  if(ver >= pInfo->lastVer) {
    pInfo++;
  }
  //iterate files, until the searched result
  for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
L
Liu Jicong 已提交
260 261
    if(pWal->totSize > pWal->cfg.retentionSize ||
        iter->closeTs + pWal->cfg.retentionPeriod > ts) {
L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
      //delete according to file size or close time
      deleteCnt++;
      newTotSize -= iter->fileSize;
    }
  }
  char fnameStr[WAL_FILE_LEN];
  //remove file
  for(int i = 0; i < deleteCnt; i++) {
    WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
    walBuildLogName(pWal, pInfo->firstVer, fnameStr); 
    remove(fnameStr);
    walBuildIdxName(pWal, pInfo->firstVer, fnameStr); 
    remove(fnameStr);
  }

  //make new array, remove files
  taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); 
  if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
L
Liu Jicong 已提交
280
    pWal->writeCur = -1;
L
Liu Jicong 已提交
281
    pWal->vers.firstVer = -1;
L
Liu Jicong 已提交
282
  } else {
L
Liu Jicong 已提交
283
    pWal->vers.firstVer = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
L
Liu Jicong 已提交
284
  }
L
Liu Jicong 已提交
285
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;;
L
Liu Jicong 已提交
286
  pWal->totSize = newTotSize;
L
Liu Jicong 已提交
287
  pWal->vers.verInSnapshotting = -1;
L
Liu Jicong 已提交
288 289 290 291 292 293 294 295 296 297

  //save snapshot ver, commit ver
  int code = walWriteMeta(pWal);
  if(code != 0) {
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
298
int walRoll(SWal *pWal) {
L
Liu Jicong 已提交
299
  int code = 0;
L
Liu Jicong 已提交
300 301
  if(pWal->writeIdxTfd != -1) {
    code = tfClose(pWal->writeIdxTfd);
L
Liu Jicong 已提交
302 303 304
    if(code != 0) {
      return -1;
    }
L
Liu Jicong 已提交
305
  }
L
Liu Jicong 已提交
306 307
  if(pWal->writeLogTfd != -1) {
    code = tfClose(pWal->writeLogTfd);
L
Liu Jicong 已提交
308 309 310
    if(code != 0) {
      return -1;
    }
L
Liu Jicong 已提交
311 312 313
  }
  int64_t idxTfd, logTfd;
  //create new file
L
Liu Jicong 已提交
314
  int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
L
Liu Jicong 已提交
315
  char fnameStr[WAL_FILE_LEN];
L
Liu Jicong 已提交
316
  walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
L
Liu Jicong 已提交
317
  idxTfd = tfOpenCreateWrite(fnameStr);
L
Liu Jicong 已提交
318 319 320 321 322
  if(idxTfd < 0) {
    ASSERT(0);
    return -1;
  }
  walBuildLogName(pWal, newFileFirstVersion, fnameStr);
L
Liu Jicong 已提交
323
  logTfd = tfOpenCreateWrite(fnameStr);
L
Liu Jicong 已提交
324 325 326 327 328 329 330 331 332
  if(logTfd < 0) {
    ASSERT(0);
    return -1;
  }
  code = walRollFileInfo(pWal);
  if(code != 0) {
    ASSERT(0);
    return -1;
  }
L
Liu Jicong 已提交
333 334

  //switch file
L
Liu Jicong 已提交
335 336
  pWal->writeIdxTfd = idxTfd;
  pWal->writeLogTfd = logTfd;
L
Liu Jicong 已提交
337
  pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
L
Liu Jicong 已提交
338 339 340 341 342 343 344
  //change status
  pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;

  pWal->lastRollSeq = walGetSeq();
  return 0;
}

L
Liu Jicong 已提交
345
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
L
Liu Jicong 已提交
346 347 348
  WalIdxEntry entry = { .ver = ver, .offset = offset };
  int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry));
  if(size != sizeof(WalIdxEntry)) {
L
Liu Jicong 已提交
349
    //TODO truncate
L
Liu Jicong 已提交
350
    return -1;
L
Liu Jicong 已提交
351 352 353 354
  }
  return 0;
}

L
Liu Jicong 已提交
355
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) {
L
Liu Jicong 已提交
356
  if (pWal == NULL) return -1;
L
Liu Jicong 已提交
357
  int code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358

S
TD-1652  
Shengliang Guan 已提交
359
  // no wal
L
Liu Jicong 已提交
360
  if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
L
Liu Jicong 已提交
361

L
Liu Jicong 已提交
362
  if (index == pWal->vers.lastVer + 1) {
L
Liu Jicong 已提交
363
    if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
L
Liu Jicong 已提交
364
      pWal->vers.firstVer = index;
L
Liu Jicong 已提交
365 366
      code = walRoll(pWal);
      ASSERT(code == 0);
L
Liu Jicong 已提交
367
    } else {
L
Liu Jicong 已提交
368
      int64_t passed = walGetSeq() - pWal->lastRollSeq;
L
Liu Jicong 已提交
369
      if(pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
L
Liu Jicong 已提交
370
        walRoll(pWal);
L
Liu Jicong 已提交
371
      } else if(pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
L
Liu Jicong 已提交
372 373
        walRoll(pWal);
      }
L
Liu Jicong 已提交
374 375 376 377 378 379
    }
  } else {
    //reject skip log or rewrite log
    //must truncate explicitly first
    return -1;
  }
L
Liu Jicong 已提交
380
  /*if (!tfValid(pWal->curLogTfd)) return 0;*/
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
381

382
  pthread_mutex_lock(&pWal->mutex);
L
Liu Jicong 已提交
383
  pWal->writeHead.head.version = index;
L
Liu Jicong 已提交
384

L
Liu Jicong 已提交
385 386 387 388
  pWal->writeHead.head.len = bodyLen;
  pWal->writeHead.head.msgType = msgType;
  pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
  pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
389

L
Liu Jicong 已提交
390
  if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
L
Liu Jicong 已提交
391
    //ftruncate
S
TD-1846  
Shengliang Guan 已提交
392
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
393
    wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
J
Jeff Tao 已提交
394
  }
S
TD-1846  
Shengliang Guan 已提交
395

L
Liu Jicong 已提交
396
  if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) {
L
Liu Jicong 已提交
397 398
    //ftruncate
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
399
    wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
L
Liu Jicong 已提交
400 401 402 403
  }
  code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal));
  if(code != 0) {
    //TODO
L
Liu Jicong 已提交
404
    return -1;
L
Liu Jicong 已提交
405
  }
406

L
Liu Jicong 已提交
407
  //set status
L
Liu Jicong 已提交
408
  pWal->vers.lastVer = index;
L
Liu Jicong 已提交
409
  pWal->totSize += sizeof(SWalHead) + bodyLen;
L
Liu Jicong 已提交
410 411
  walGetCurFileInfo(pWal)->lastVer = index;
  walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
L
Liu Jicong 已提交
412
  
L
Liu Jicong 已提交
413
  pthread_mutex_unlock(&pWal->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
414

S
TD-1846  
Shengliang Guan 已提交
415
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
416 417
}

L
Liu Jicong 已提交
418
void walFsync(SWal *pWal, bool forceFsync) {
L
Liu Jicong 已提交
419 420
  if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
    wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
L
Liu Jicong 已提交
421
    if (tfFsync(pWal->writeLogTfd) < 0) {
L
Liu Jicong 已提交
422
      wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno));
423 424
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
425 426
}

L
Liu Jicong 已提交
427
#if 0
S
TD-1918  
Shengliang Guan 已提交
428
int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
S
TD-1846  
Shengliang Guan 已提交
429
  if (handle == NULL) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
430

S
TD-1846  
Shengliang Guan 已提交
431 432
  SWal *  pWal = handle;
  int32_t count = 0;
S
TD-1846  
Shengliang Guan 已提交
433 434
  int32_t code = 0;
  int64_t fileId = -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
435

S
TD-1846  
Shengliang Guan 已提交
436
  while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
L
Liu Jicong 已提交
437
    /*if (fileId == pWal->curFileId) continue;*/
J
Jeff Tao 已提交
438

S
TD-1846  
Shengliang Guan 已提交
439
    char walName[WAL_FILE_LEN];
L
Liu Jicong 已提交
440
    snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
441

S
TD-2381  
Shengliang Guan 已提交
442
    wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
443
    code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
S
TD-1846  
Shengliang Guan 已提交
444
    if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
445
      wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
S
TD-1846  
Shengliang Guan 已提交
446 447
      continue;
    }
S
TD-1846  
Shengliang Guan 已提交
448

L
Liu Jicong 已提交
449
    wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion);
S
TD-1846  
Shengliang Guan 已提交
450 451

    count++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
452 453
  }

L
Liu Jicong 已提交
454
  /*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/
S
TD-1846  
Shengliang Guan 已提交
455 456

  if (count == 0) {
S
TD-1894  
Shengliang Guan 已提交
457
    wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
S
TD-1846  
Shengliang Guan 已提交
458 459 460
    return walRenew(pWal);
  } else {
    // open the existing WAL file in append mode
L
Liu Jicong 已提交
461 462 463 464 465
    /*pWal->curFileId = 0;*/
    snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
    pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName);
    if (!tfValid(pWal->logTfd)) {
      wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
S
TD-1846  
Shengliang Guan 已提交
466
      return TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
467
    }
L
Liu Jicong 已提交
468
    wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
469 470
  }

S
TD-1846  
Shengliang Guan 已提交
471
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
472 473
}

S
TD-1846  
Shengliang Guan 已提交
474 475 476
int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
  if (handle == NULL) return -1;
  SWal *pWal = handle;
477

S
TD-1846  
Shengliang Guan 已提交
478 479
  if (*fileId == 0) *fileId = -1;

480
  pthread_mutex_lock(&(pWal->mutex));
S
TD-1846  
Shengliang Guan 已提交
481

S
TD-1846  
Shengliang Guan 已提交
482 483 484
  int32_t code = walGetNextFile(pWal, fileId);
  if (code >= 0) {
    sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
L
Liu Jicong 已提交
485
    /*code = (*fileId == pWal->curFileId) ? 0 : 1;*/
486
  }
S
TD-1846  
Shengliang Guan 已提交
487

L
Liu Jicong 已提交
488
  wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId);
489 490 491
  pthread_mutex_unlock(&(pWal->mutex));

  return code;
S
TD-1652  
Shengliang Guan 已提交
492
}
L
Liu Jicong 已提交
493
#endif
494

L
Liu Jicong 已提交
495 496 497 498 499 500 501 502 503
/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
  /*int code = 0;*/
  /*SWalHead *pHead = NULL;*/
  /*code = (int)walRead(pWal, &pHead, ver);*/
  /*if(pHead->head.version != ver) {*/
    /*return -1;*/
  /*}*/
  /*return 0;*/
/*}*/
L
Liu Jicong 已提交
504

L
Liu Jicong 已提交
505 506 507 508 509
/*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/
  /*int code = walSeekVer(pWal, ver);*/
  /*if(code != 0) {*/
    /*return -1;*/
  /*}*/
L
Liu Jicong 已提交
510

L
Liu Jicong 已提交
511 512 513 514
  /*code = walValidateOffset(pWal, ver);*/
  /*if(code != 0) {*/
    /*return -1;*/
  /*}*/
L
Liu Jicong 已提交
515

L
Liu Jicong 已提交
516 517
  /*return 0;*/
/*}*/
L
Liu Jicong 已提交
518

L
Liu Jicong 已提交
519
#if 0
S
TD-1895  
Shengliang Guan 已提交
520
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
S
TD-1891  
Shengliang Guan 已提交
521 522 523 524
  int64_t pos = *offset;
  while (1) {
    pos++;

S
TD-1895  
Shengliang Guan 已提交
525
    if (tfLseek(tfd, pos, SEEK_SET) < 0) {
S
TD-1891  
Shengliang Guan 已提交
526 527 528 529
      wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
      return TSDB_CODE_WAL_FILE_CORRUPTED;
    }

S
TD-1895  
Shengliang Guan 已提交
530
    if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
S
TD-1891  
Shengliang Guan 已提交
531 532 533 534 535 536 537 538
      wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
      return TSDB_CODE_WAL_FILE_CORRUPTED;
    }

    if (pHead->signature != WAL_SIGNATURE) {
      continue;
    }

C
Cary Xu 已提交
539
    if (pHead->sver >= 1) {
540 541 542 543 544 545 546 547 548 549 550
      if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
	wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
	return TSDB_CODE_WAL_FILE_CORRUPTED;
      }

      if (walValidateChecksum(pHead)) {
	wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
	*offset = pos;
	return TSDB_CODE_SUCCESS;
      }
    }
S
TD-1891  
Shengliang Guan 已提交
551 552 553 554
  }

  return TSDB_CODE_WAL_FILE_CORRUPTED;
}
555

S
TD-1882  
Shengliang Guan 已提交
556
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
S
TD-1846  
Shengliang Guan 已提交
557
  int32_t size = WAL_MAX_SIZE;
L
Liu Jicong 已提交
558
  void *  buffer = malloc(size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
559
  if (buffer == NULL) {
S
TD-1846  
Shengliang Guan 已提交
560
    wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
S
TD-1846  
Shengliang Guan 已提交
561
    return TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
562
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
563

564
  int64_t tfd = tfOpenReadWrite(name);
S
TD-1895  
Shengliang Guan 已提交
565
  if (!tfValid(tfd)) {
S
TD-1846  
Shengliang Guan 已提交
566
    wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
S
TD-1846  
Shengliang Guan 已提交
567 568
    tfree(buffer);
    return TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
569 570
  } else {
    wDebug("vgId:%d, file:%s, open for restore", pWal->vgId, name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
571 572
  }

S
TD-1846  
Shengliang Guan 已提交
573
  int32_t   code = TSDB_CODE_SUCCESS;
S
Shengliang Guan 已提交
574
  int64_t   offset = 0;
S
TD-1846  
Shengliang Guan 已提交
575 576
  SWalHead *pHead = buffer;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
577
  while (1) {
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
    int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead));
    if (ret == 0) break;

    if (ret < 0) {
      wError("vgId:%d, file:%s, failed to read wal head since %s", pWal->vgId, name, strerror(errno));
      code = TAOS_SYSTEM_ERROR(errno);
      break;
    }

    if (ret < sizeof(SWalHead)) {
      wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
      walFtruncate(pWal, tfd, offset);
      break;
    }

593
    if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
      wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
             pHead->version, pHead->len, offset);
      code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
      if (code != TSDB_CODE_SUCCESS) {
        walFtruncate(pWal, tfd, offset);
        break;
      }
    }

    if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
      wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
             pHead->version, pHead->len, offset);
      code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
      if (code != TSDB_CODE_SUCCESS) {
        walFtruncate(pWal, tfd, offset);
        break;
      }
    }

    ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
    if (ret < 0) {
      wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
      code = TAOS_SYSTEM_ERROR(errno);
      break;
    }

    if (ret < pHead->len) {
      wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
      offset += sizeof(SWalHead);
      continue;
    }

C
Cary Xu 已提交
626
    if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) {
627 628 629 630 631 632 633 634 635
      wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
             pHead->version, pHead->len, offset);
      code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
      if (code != TSDB_CODE_SUCCESS) {
        walFtruncate(pWal, tfd, offset);
        break;
      }
    }

H
Hongze Cheng 已提交
636 637
    offset = offset + sizeof(SWalHead) + pHead->len;

S
TD-4176  
Shengliang Guan 已提交
638
    wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
L
Liu Jicong 已提交
639
           pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset);
S
TD-1846  
Shengliang Guan 已提交
640

L
Liu Jicong 已提交
641
    pWal->curVersion = pHead->version;
642

643
    // wInfo("writeFp: %ld", offset);
L
Liu Jicong 已提交
644
    (*writeFp)(pVnode, pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
645 646
  }

S
TD-1895  
Shengliang Guan 已提交
647
  tfClose(tfd);
S
TD-1846  
Shengliang Guan 已提交
648
  tfree(buffer);
J
Jeff Tao 已提交
649

S
Shengliang Guan 已提交
650
  wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name);
S
TD-1846  
Shengliang Guan 已提交
651
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
652
}
L
Liu Jicong 已提交
653
#endif