walMeta.c 28.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
L
Liu Jicong 已提交
17 18
#include "os.h"
#include "taoserror.h"
L
Liu Jicong 已提交
19
#include "tutil.h"
L
Liu Jicong 已提交
20 21
#include "walInt.h"

22 23 24 25
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
  return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
}

L
Liu Jicong 已提交
26 27
bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; }

L
Liu Jicong 已提交
28
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
L
Liu Jicong 已提交
29

L
Liu Jicong 已提交
30
int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34 35
int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; }

L
Liu Jicong 已提交
36 37
int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; }

38 39 40 41 42 43
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
  return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
}

static FORCE_INLINE int walBuildTmpMetaName(SWal* pWal, char* buf) {
  return sprintf(buf, "%s/meta-ver.tmp", pWal->path);
L
Liu Jicong 已提交
44 45
}

46
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
L
Liu Jicong 已提交
47
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
48 49
  terrno = TSDB_CODE_SUCCESS;
  ASSERT(fileIdx >= 0 && fileIdx < sz);
L
Liu Jicong 已提交
50

51
  SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
L
Liu Jicong 已提交
52
  char          fnameStr[WAL_FILE_LEN];
53
  walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
L
Liu Jicong 已提交
54

L
Liu Jicong 已提交
55 56
  int64_t fileSize = 0;
  taosStatFile(fnameStr, &fileSize, NULL);
L
Liu Jicong 已提交
57

58
  TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
59
  if (pFile == NULL) {
60
    wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
L
Liu Jicong 已提交
61 62 63 64
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

65 66 67
  // ensure size as non-negative
  pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);

L
Liu Jicong 已提交
68
  uint64_t magic = WAL_MAGIC;
69 70 71
  int64_t  walCkHeadSz = sizeof(SWalCkHead);
  int64_t  end = fileSize;
  int64_t  offset = 0;
72 73
  int64_t  capacity = 0;
  int64_t  readSize = 0;
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
  char*    buf = NULL;
  char*    found = NULL;
  bool     firstTrial = pFileInfo->fileSize < fileSize;

  // search for the valid last WAL entry, e.g. block by block
  while (1) {
    offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE);
    ASSERT(offset <= end);
    readSize = end - offset;
    capacity = readSize + sizeof(magic);

    int64_t limit = WAL_RECOV_SIZE_LIMIT;
    if (limit < readSize) {
      wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
             ", end:%" PRId64 ", file:%s",
             pWal->cfg.vgId, limit, offset, end, fnameStr);
      terrno = TSDB_CODE_WAL_SIZE_LIMIT;
      goto _err;
    }
L
Liu Jicong 已提交
93

94 95 96 97 98 99
    void* ptr = taosMemoryRealloc(buf, capacity);
    if (ptr == NULL) {
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
      goto _err;
    }
    buf = ptr;
L
Liu Jicong 已提交
100

101 102 103 104 105 106 107 108 109 110 111 112 113
    int64_t ret = taosLSeekFile(pFile, offset, SEEK_SET);
    if (ret < 0) {
      wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno), offset);
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }

    if (readSize != taosReadFile(pFile, buf, readSize)) {
      wError("vgId:%d, failed to read file due to %s. readSize:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
             readSize, fnameStr);
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
L
Liu Jicong 已提交
114

115
    char* candidate = NULL;
116 117 118 119
    char* haystack = buf;

    while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(magic))) != NULL) {
      // validate head
120 121
      int64_t len = readSize - (candidate - buf);
      if (len < walCkHeadSz) {
122
        break;
123
      }
124
      SWalCkHead* logContent = (SWalCkHead*)candidate;
125
      if (walValidHeadCksum(logContent) != 0) {
126 127
        wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
               offset + ((char*)(logContent)-buf), fnameStr);
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
        haystack = candidate + 1;
        if (firstTrial) {
          break;
        } else {
          continue;
        }
      }

      // validate body
      int64_t size = walCkHeadSz + logContent->head.bodyLen;
      if (len < size) {
        int64_t extraSize = size - len;
        if (capacity < readSize + extraSize + sizeof(magic)) {
          capacity += extraSize;
          void* ptr = taosMemoryRealloc(buf, capacity);
          if (ptr == NULL) {
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            goto _err;
          }
          buf = ptr;
        }
        int64_t ret = taosLSeekFile(pFile, offset + readSize, SEEK_SET);
        if (ret < 0) {
          wError("vgId:%d, failed to lseek file due to %s. offset:%" PRId64 "", pWal->cfg.vgId, strerror(errno),
                 offset);
          terrno = TAOS_SYSTEM_ERROR(errno);
          break;
        }
        if (extraSize != taosReadFile(pFile, buf + readSize, extraSize)) {
          wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", extraSize:%" PRId64 ", file:%s",
                 pWal->cfg.vgId, strerror(errno), offset + readSize, extraSize, fnameStr);
          terrno = TAOS_SYSTEM_ERROR(errno);
          break;
        }
162
      }
163 164
      if (walValidBodyCksum(logContent) != 0) {
        terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
165 166
        wWarn("vgId:%d, failed to validate checksum of wal entry body. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
               offset + ((char*)(logContent)-buf), fnameStr);
167 168 169 170 171 172 173 174 175 176
        haystack = candidate + 1;
        if (firstTrial) {
          break;
        } else {
          continue;
        }
      }

      // found one
      found = candidate;
177
      haystack = candidate + 1;
L
Liu Jicong 已提交
178
    }
179

180
    if (found || offset == 0) break;
181

182 183 184
    // go backwards, e.g. by at most one WAL scan buf size
    end = offset + walCkHeadSz - 1;
    firstTrial = false;
185 186
  }

187 188 189 190 191 192 193 194 195 196 197 198
  // determine end of last entry
  SWalCkHead* lastEntry = (SWalCkHead*)found;
  int64_t     retVer = -1;
  int64_t     lastEntryBeginOffset = 0;
  int64_t     lastEntryEndOffset = 0;

  if (lastEntry == NULL) {
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
  } else {
    retVer = lastEntry->head.version;
    lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf);
    lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen;
L
Liu Jicong 已提交
199
  }
L
Liu Jicong 已提交
200 201 202

  // truncate file
  if (lastEntryEndOffset != fileSize) {
203 204
    wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr,
          lastEntryEndOffset, fileSize);
205
    if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
206 207 208
      wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr);
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
209 210
    }
    if (taosFsyncFile(pFile) < 0) {
211 212 213
      wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
214
    }
L
Liu Jicong 已提交
215
  }
216
  pFileInfo->fileSize = lastEntryEndOffset;
L
Liu Jicong 已提交
217

218 219
  taosCloseFile(&pFile);
  taosMemoryFree(buf);
L
Liu Jicong 已提交
220
  return retVer;
221 222 223 224 225

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(buf);
  return -1;
L
Liu Jicong 已提交
226 227
}

228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
static void walRebuildFileInfoSet(SArray* metaLogList, SArray* actualLogList) {
  int metaFileNum = taosArrayGetSize(metaLogList);
  int actualFileNum = taosArrayGetSize(actualLogList);
  int j = 0;

  // both of the lists in asc order
  for (int i = 0; i < actualFileNum; i++) {
    SWalFileInfo* pLogInfo = taosArrayGet(actualLogList, i);
    while (j < metaFileNum) {
      SWalFileInfo* pMetaInfo = taosArrayGet(metaLogList, j);
      ASSERT(pMetaInfo != NULL);
      if (pMetaInfo->firstVer < pLogInfo->firstVer) {
        j++;
      } else if (pMetaInfo->firstVer == pLogInfo->firstVer) {
        (*pLogInfo) = *pMetaInfo;
        j++;
        break;
      } else {
        break;
      }
    }
  }

  taosArrayClear(metaLogList);

  for (int i = 0; i < actualFileNum; i++) {
    SWalFileInfo* pFileInfo = taosArrayGet(actualLogList, i);
    taosArrayPush(metaLogList, pFileInfo);
  }
}

void walAlignVersions(SWal* pWal) {
260 261
  if (pWal->vers.firstVer > pWal->vers.snapshotVer + 1) {
    wWarn("vgId:%d, firstVer:%" PRId64 " is larger than snapshotVer:%" PRId64 " + 1. align with it.", pWal->cfg.vgId,
262
          pWal->vers.firstVer, pWal->vers.snapshotVer);
263
    pWal->vers.firstVer = pWal->vers.snapshotVer + 1;
264 265
  }
  if (pWal->vers.lastVer < pWal->vers.snapshotVer) {
266
    wWarn("vgId:%d, lastVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId,
267 268 269 270
          pWal->vers.lastVer, pWal->vers.snapshotVer);
    pWal->vers.lastVer = pWal->vers.snapshotVer;
  }
  if (pWal->vers.commitVer < pWal->vers.snapshotVer) {
271
    wWarn("vgId:%d, commitVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId,
272 273 274 275
          pWal->vers.commitVer, pWal->vers.snapshotVer);
    pWal->vers.commitVer = pWal->vers.snapshotVer;
  }
  if (pWal->vers.appliedVer < pWal->vers.snapshotVer) {
276
    wWarn("vgId:%d, appliedVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId,
277 278 279 280 281 282 283 284
          pWal->vers.appliedVer, pWal->vers.snapshotVer);
    pWal->vers.appliedVer = pWal->vers.snapshotVer;
  }

  pWal->vers.commitVer = TMIN(pWal->vers.lastVer, pWal->vers.commitVer);
  pWal->vers.appliedVer = TMIN(pWal->vers.commitVer, pWal->vers.appliedVer);
}

L
Liu Jicong 已提交
285 286 287 288
int walCheckAndRepairMeta(SWal* pWal) {
  // load log files, get first/snapshot/last version info
  const char* logPattern = "^[0-9]+.log$";
  const char* idxPattern = "^[0-9]+.idx$";
L
Liu Jicong 已提交
289 290
  regex_t     logRegPattern;
  regex_t     idxRegPattern;
L
Liu Jicong 已提交
291 292 293

  regcomp(&logRegPattern, logPattern, REG_EXTENDED);
  regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
L
Liu Jicong 已提交
294

wafwerar's avatar
wafwerar 已提交
295 296
  TdDirPtr pDir = taosOpenDir(pWal->path);
  if (pDir == NULL) {
L
Liu Jicong 已提交
297 298
    regfree(&logRegPattern);
    regfree(&idxRegPattern);
L
Liu Jicong 已提交
299 300 301 302
    wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
    return -1;
  }

303 304
  SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));

L
Liu Jicong 已提交
305
  // scan log files and build new meta
wafwerar's avatar
wafwerar 已提交
306 307 308
  TdDirEntryPtr pDirEntry;
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
    char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry));
L
Liu Jicong 已提交
309 310
    int   code = regexec(&logRegPattern, name, 0, NULL, 0);
    if (code == 0) {
L
Liu Jicong 已提交
311 312 313
      SWalFileInfo fileInfo;
      memset(&fileInfo, -1, sizeof(SWalFileInfo));
      sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
314
      taosArrayPush(actualLog, &fileInfo);
L
Liu Jicong 已提交
315 316 317
    }
  }

wafwerar's avatar
wafwerar 已提交
318
  taosCloseDir(&pDir);
L
Liu Jicong 已提交
319 320 321
  regfree(&logRegPattern);
  regfree(&idxRegPattern);

322
  taosArraySort(actualLog, compareWalFileInfo);
L
Liu Jicong 已提交
323

324 325 326 327 328 329 330 331 332 333 334 335 336
  int     metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
  int     actualFileNum = taosArrayGetSize(actualLog);
  int64_t firstVerPrev = pWal->vers.firstVer;
  int64_t lastVerPrev = pWal->vers.lastVer;
  int64_t totSize = 0;
  bool    updateMeta = (metaFileNum != actualFileNum);

  // rebuild meta of file info
  walRebuildFileInfoSet(pWal->fileInfoSet, actualLog);
  taosArrayDestroy(actualLog);

  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
  ASSERT(sz == actualFileNum);
L
Liu Jicong 已提交
337

338 339 340 341
  // scan and determine the lastVer
  int32_t fileIdx = sz;

  while (--fileIdx >= 0) {
L
Liu Jicong 已提交
342
    char          fnameStr[WAL_FILE_LEN];
343 344 345
    int64_t       fileSize = 0;
    SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);

L
Liu Jicong 已提交
346
    walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
347 348 349 350 351
    int32_t code = taosStatFile(fnameStr, &fileSize, NULL);
    if (code < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      wError("failed to stat file since %s. file:%s", terrstr(), fnameStr);
      return -1;
L
Liu Jicong 已提交
352 353
    }

354 355 356 357 358
    ASSERT(pFileInfo->firstVer >= 0);

    if (pFileInfo->lastVer >= pFileInfo->firstVer && fileSize == pFileInfo->fileSize) {
      totSize += pFileInfo->fileSize;
      continue;
L
Liu Jicong 已提交
359
    }
360
    updateMeta = true;
L
Liu Jicong 已提交
361

362 363 364 365 366
    int64_t lastVer = walScanLogGetLastVer(pWal, fileIdx);
    if (lastVer < 0) {
      if (terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
        wError("failed to scan wal last ver since %s", terrstr());
        return -1;
L
Liu Jicong 已提交
367
      }
368 369 370 371 372 373 374 375
      ASSERT(pFileInfo->fileSize == 0);
      // remove the empty wal log, and its idx
      taosRemoveFile(fnameStr);
      walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
      taosRemoveFile(fnameStr);
      // remove its meta entry
      taosArrayRemove(pWal->fileInfoSet, fileIdx);
      continue;
L
Liu Jicong 已提交
376 377
    }

378 379 380
    // update lastVer
    pFileInfo->lastVer = lastVer;
    totSize += pFileInfo->fileSize;
L
Liu Jicong 已提交
381
  }
L
Liu Jicong 已提交
382

383
  // reset vers info and so on
L
Liu Jicong 已提交
384
  actualFileNum = taosArrayGetSize(pWal->fileInfoSet);
L
Liu Jicong 已提交
385
  pWal->writeCur = actualFileNum - 1;
386 387
  pWal->totSize = totSize;
  pWal->vers.lastVer = -1;
L
Liu Jicong 已提交
388
  if (actualFileNum > 0) {
389 390
    pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
    pWal->vers.lastVer = ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer;
L
Liu Jicong 已提交
391
  }
392
  (void)walAlignVersions(pWal);
L
Liu Jicong 已提交
393

394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
  // update meta file
  if (updateMeta) {
    (void)walSaveMeta(pWal);
  }
  return 0;
}

int walReadLogHead(TdFilePtr pLogFile, int64_t offset, SWalCkHead* pCkHead) {
  if (taosLSeekFile(pLogFile, offset, SEEK_SET) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (taosReadFile(pLogFile, pCkHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (walValidHeadCksum(pCkHead) != 0) {
    terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
    return -1;
L
Liu Jicong 已提交
415 416
  }

L
Liu Jicong 已提交
417 418 419
  return 0;
}

420
int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
L
Liu Jicong 已提交
421
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
  ASSERT(fileIdx >= 0 && fileIdx < sz);
  SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
  char          fnameStr[WAL_FILE_LEN];
  walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
  char fLogNameStr[WAL_FILE_LEN];
  walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr);
  int64_t fileSize = 0;

  if (taosStatFile(fnameStr, &fileSize, NULL) < 0 && errno != ENOENT) {
    wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  ASSERT(pFileInfo->fileSize > 0 && pFileInfo->firstVer >= 0 && pFileInfo->lastVer >= pFileInfo->firstVer);
  if (fileSize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
    return 0;
  }

  // start to repair
  int64_t      offset = fileSize - fileSize % sizeof(SWalIdxEntry);
  TdFilePtr    pLogFile = NULL;
  TdFilePtr    pIdxFile = NULL;
  SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer - 1, .offset = -sizeof(SWalCkHead)};
  SWalCkHead   ckHead;
  memset(&ckHead, 0, sizeof(ckHead));
  ckHead.head.version = idxEntry.ver;

  pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE);
  if (pIdxFile == NULL) {
    wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ);
  if (pLogFile == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr());
    goto _err;
  }

  // determine the last valid entry end, i.e. offset
  while ((offset -= sizeof(SWalIdxEntry)) >= 0) {
    if (taosLSeekFile(pIdxFile, offset, SEEK_SET) < 0) {
467
      wError("vgId:%d, failed to seek file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
468
             offset, fnameStr);
L
Liu Jicong 已提交
469
      terrno = TAOS_SYSTEM_ERROR(errno);
470 471 472 473 474 475 476 477
      goto _err;
    }

    if (taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
      wError("vgId:%d, failed to read file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
             offset, fnameStr);
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
L
Liu Jicong 已提交
478 479
    }

480
    if (idxEntry.ver > pFileInfo->lastVer) {
L
Liu Jicong 已提交
481 482
      continue;
    }
L
Liu Jicong 已提交
483

484 485 486 487
    if (offset != (idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry)) {
      continue;
    }

488 489 490 491
    if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
      wWarn("vgId:%d, failed to read log file since %s. file:%s, offset:%" PRId64 ", idx entry ver:%" PRId64 "",
            pWal->cfg.vgId, terrstr(), fLogNameStr, idxEntry.offset, idxEntry.ver);
      continue;
L
Liu Jicong 已提交
492 493
    }

494 495
    if (idxEntry.ver == ckHead.head.version) {
      break;
L
Liu Jicong 已提交
496
    }
497 498
  }
  offset += sizeof(SWalIdxEntry);
L
Liu Jicong 已提交
499

500 501
  ASSERT(offset == (idxEntry.ver - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry));

502 503 504 505 506 507 508 509 510 511 512 513
  // ftruncate idx file
  if (offset < fileSize) {
    if (taosFtruncateFile(pIdxFile, offset) < 0) {
      wError("vgId:%d, failed to ftruncate file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
             strerror(errno), offset, fnameStr);
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
  }

  // rebuild idx file
  if (taosLSeekFile(pIdxFile, 0, SEEK_END) < 0) {
514
    wError("vgId:%d, failed to seek file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno),
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
           offset, fnameStr);
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  while (idxEntry.ver < pFileInfo->lastVer) {
    ASSERT(idxEntry.ver == ckHead.head.version);

    idxEntry.ver += 1;
    idxEntry.offset += sizeof(SWalCkHead) + ckHead.head.bodyLen;

    if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) {
      wError("vgId:%d, failed to read wal log head since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(),
             idxEntry.offset, fLogNameStr);
      goto _err;
    }
S
Shengliang Guan 已提交
531
    wWarn("vgId:%d, wal idx append new entry %" PRId64 " %" PRId64, pWal->cfg.vgId, idxEntry.ver, idxEntry.offset);
532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
    if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) {
      wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
      goto _err;
    }
  }

  if (taosFsyncFile(pIdxFile) < 0) {
    wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr);
    goto _err;
  }

  (void)taosCloseFile(&pLogFile);
  (void)taosCloseFile(&pIdxFile);
  return 0;

_err:
  (void)taosCloseFile(&pLogFile);
  (void)taosCloseFile(&pIdxFile);
  return -1;
}

int walCheckAndRepairIdx(SWal* pWal) {
  int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
  int32_t fileIdx = sz;
  while (--fileIdx >= 0) {
    if (walCheckAndRepairIdxFile(pWal, fileIdx) < 0) {
      wError("vgId:%d, failed to repair idx file since %s. fileIdx:%d", pWal->cfg.vgId, terrstr(), fileIdx);
      return -1;
L
Liu Jicong 已提交
560 561
    }
  }
L
Liu Jicong 已提交
562 563 564
  return 0;
}

L
Liu Jicong 已提交
565 566 567 568
int walRollFileInfo(SWal* pWal) {
  int64_t ts = taosGetTimestampSec();

  SArray* pArray = pWal->fileInfoSet;
L
Liu Jicong 已提交
569
  if (taosArrayGetSize(pArray) != 0) {
L
Liu Jicong 已提交
570
    SWalFileInfo* pInfo = taosArrayGetLast(pArray);
L
Liu Jicong 已提交
571
    pInfo->lastVer = pWal->vers.lastVer;
L
Liu Jicong 已提交
572 573 574
    pInfo->closeTs = ts;
  }

L
Liu Jicong 已提交
575
  // TODO: change to emplace back
wafwerar's avatar
wafwerar 已提交
576
  SWalFileInfo* pNewInfo = taosMemoryMalloc(sizeof(SWalFileInfo));
L
Liu Jicong 已提交
577
  if (pNewInfo == NULL) {
L
Liu Jicong 已提交
578
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
L
Liu Jicong 已提交
579 580
    return -1;
  }
L
Liu Jicong 已提交
581
  pNewInfo->firstVer = pWal->vers.lastVer + 1;
L
Liu Jicong 已提交
582 583 584 585
  pNewInfo->lastVer = -1;
  pNewInfo->createTs = ts;
  pNewInfo->closeTs = -1;
  pNewInfo->fileSize = 0;
L
Liu Jicong 已提交
586
  taosArrayPush(pArray, pNewInfo);
wafwerar's avatar
wafwerar 已提交
587
  taosMemoryFree(pNewInfo);
L
Liu Jicong 已提交
588 589 590
  return 0;
}

591
char* walMetaSerialize(SWal* pWal) {
L
Liu Jicong 已提交
592
  char buf[30];
L
Liu Jicong 已提交
593
  ASSERT(pWal->fileInfoSet);
L
Liu Jicong 已提交
594
  int    sz = taosArrayGetSize(pWal->fileInfoSet);
595 596 597 598
  cJSON* pRoot = cJSON_CreateObject();
  cJSON* pMeta = cJSON_CreateObject();
  cJSON* pFiles = cJSON_CreateArray();
  cJSON* pField;
L
Liu Jicong 已提交
599
  if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
L
Liu Jicong 已提交
600
    if (pRoot) {
L
Liu Jicong 已提交
601 602
      cJSON_Delete(pRoot);
    }
L
Liu Jicong 已提交
603
    if (pMeta) {
L
Liu Jicong 已提交
604 605
      cJSON_Delete(pMeta);
    }
L
Liu Jicong 已提交
606
    if (pFiles) {
L
Liu Jicong 已提交
607 608 609
      cJSON_Delete(pFiles);
    }
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
L
Liu Jicong 已提交
610 611
    return NULL;
  }
612
  cJSON_AddItemToObject(pRoot, "meta", pMeta);
L
Liu Jicong 已提交
613
  sprintf(buf, "%" PRId64, pWal->vers.firstVer);
614
  cJSON_AddStringToObject(pMeta, "firstVer", buf);
L
Liu Jicong 已提交
615
  sprintf(buf, "%" PRId64, pWal->vers.snapshotVer);
616
  cJSON_AddStringToObject(pMeta, "snapshotVer", buf);
L
Liu Jicong 已提交
617
  sprintf(buf, "%" PRId64, pWal->vers.commitVer);
618
  cJSON_AddStringToObject(pMeta, "commitVer", buf);
L
Liu Jicong 已提交
619
  sprintf(buf, "%" PRId64, pWal->vers.lastVer);
620 621 622
  cJSON_AddStringToObject(pMeta, "lastVer", buf);

  cJSON_AddItemToObject(pRoot, "files", pFiles);
L
Liu Jicong 已提交
623
  SWalFileInfo* pData = pWal->fileInfoSet->pData;
L
Liu Jicong 已提交
624
  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
625
    SWalFileInfo* pInfo = &pData[i];
626
    cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject());
L
Liu Jicong 已提交
627
    if (pField == NULL) {
628
      cJSON_Delete(pRoot);
L
Liu Jicong 已提交
629 630
      return NULL;
    }
L
Liu Jicong 已提交
631 632
    // cjson only support int32_t or double
    // string are used to prohibit the loss of precision
633 634 635 636 637 638 639 640 641 642
    sprintf(buf, "%" PRId64, pInfo->firstVer);
    cJSON_AddStringToObject(pField, "firstVer", buf);
    sprintf(buf, "%" PRId64, pInfo->lastVer);
    cJSON_AddStringToObject(pField, "lastVer", buf);
    sprintf(buf, "%" PRId64, pInfo->createTs);
    cJSON_AddStringToObject(pField, "createTs", buf);
    sprintf(buf, "%" PRId64, pInfo->closeTs);
    cJSON_AddStringToObject(pField, "closeTs", buf);
    sprintf(buf, "%" PRId64, pInfo->fileSize);
    cJSON_AddStringToObject(pField, "fileSize", buf);
L
Liu Jicong 已提交
643
  }
L
Liu Jicong 已提交
644 645 646
  char* serialized = cJSON_Print(pRoot);
  cJSON_Delete(pRoot);
  return serialized;
L
Liu Jicong 已提交
647 648
}

649 650 651 652
int walMetaDeserialize(SWal* pWal, const char* bytes) {
  ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0);
  cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField;
  pRoot = cJSON_Parse(bytes);
653
  if (!pRoot) goto _err;
654
  pMeta = cJSON_GetObjectItem(pRoot, "meta");
655
  if (!pMeta) goto _err;
656
  pField = cJSON_GetObjectItem(pMeta, "firstVer");
657
  if (!pField) goto _err;
L
Liu Jicong 已提交
658
  pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField));
659
  pField = cJSON_GetObjectItem(pMeta, "snapshotVer");
660
  if (!pField) goto _err;
L
Liu Jicong 已提交
661
  pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField));
662
  pField = cJSON_GetObjectItem(pMeta, "commitVer");
663
  if (!pField) goto _err;
L
Liu Jicong 已提交
664
  pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField));
665
  pField = cJSON_GetObjectItem(pMeta, "lastVer");
666
  if (!pField) goto _err;
L
Liu Jicong 已提交
667
  pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField));
668 669 670

  pFiles = cJSON_GetObjectItem(pRoot, "files");
  int sz = cJSON_GetArraySize(pFiles);
L
Liu Jicong 已提交
671
  // deserialize
L
Liu Jicong 已提交
672 673
  SArray* pArray = pWal->fileInfoSet;
  taosArrayEnsureCap(pArray, sz);
L
Liu Jicong 已提交
674
  SWalFileInfo* pData = pArray->pData;
L
Liu Jicong 已提交
675
  for (int i = 0; i < sz; i++) {
676
    cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
677
    if (!pInfoJson) goto _err;
L
Liu Jicong 已提交
678
    SWalFileInfo* pInfo = &pData[i];
L
Liu Jicong 已提交
679
    pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
680
    if (!pField) goto _err;
L
Liu Jicong 已提交
681 682
    pInfo->firstVer = atoll(cJSON_GetStringValue(pField));
    pField = cJSON_GetObjectItem(pInfoJson, "lastVer");
683
    if (!pField) goto _err;
L
Liu Jicong 已提交
684 685
    pInfo->lastVer = atoll(cJSON_GetStringValue(pField));
    pField = cJSON_GetObjectItem(pInfoJson, "createTs");
686
    if (!pField) goto _err;
L
Liu Jicong 已提交
687 688
    pInfo->createTs = atoll(cJSON_GetStringValue(pField));
    pField = cJSON_GetObjectItem(pInfoJson, "closeTs");
689
    if (!pField) goto _err;
L
Liu Jicong 已提交
690 691
    pInfo->closeTs = atoll(cJSON_GetStringValue(pField));
    pField = cJSON_GetObjectItem(pInfoJson, "fileSize");
692
    if (!pField) goto _err;
L
Liu Jicong 已提交
693 694 695
    pInfo->fileSize = atoll(cJSON_GetStringValue(pField));
  }
  taosArraySetSize(pArray, sz);
696
  pWal->fileInfoSet = pArray;
L
Liu Jicong 已提交
697
  pWal->writeCur = sz - 1;
L
Liu Jicong 已提交
698
  cJSON_Delete(pRoot);
699
  return 0;
700 701 702 703

_err:
  cJSON_Delete(pRoot);
  return -1;
L
Liu Jicong 已提交
704 705 706
}

static int walFindCurMetaVer(SWal* pWal) {
L
Liu Jicong 已提交
707 708
  const char* pattern = "^meta-ver[0-9]+$";
  regex_t     walMetaRegexPattern;
L
Liu Jicong 已提交
709 710
  regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED);

wafwerar's avatar
wafwerar 已提交
711 712
  TdDirPtr pDir = taosOpenDir(pWal->path);
  if (pDir == NULL) {
L
Liu Jicong 已提交
713
    wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
L
Liu Jicong 已提交
714 715 716
    return -1;
  }

wafwerar's avatar
wafwerar 已提交
717
  TdDirEntryPtr pDirEntry;
L
Liu Jicong 已提交
718

L
Liu Jicong 已提交
719
  // find existing meta-ver[x].json
L
Liu Jicong 已提交
720
  int metaVer = -1;
wafwerar's avatar
wafwerar 已提交
721 722
  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
    char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry));
L
Liu Jicong 已提交
723 724
    int   code = regexec(&walMetaRegexPattern, name, 0, NULL, 0);
    if (code == 0) {
L
Liu Jicong 已提交
725
      sscanf(name, "meta-ver%d", &metaVer);
L
Liu Jicong 已提交
726
      wDebug("vgId:%d, wal find current meta: %s is the meta file, ver %d", pWal->cfg.vgId, name, metaVer);
L
Liu Jicong 已提交
727 728
      break;
    }
L
Liu Jicong 已提交
729
    wDebug("vgId:%d, wal find current meta: %s is not meta file", pWal->cfg.vgId, name);
L
Liu Jicong 已提交
730
  }
wafwerar's avatar
wafwerar 已提交
731
  taosCloseDir(&pDir);
L
Liu Jicong 已提交
732
  regfree(&walMetaRegexPattern);
L
Liu Jicong 已提交
733 734 735
  return metaVer;
}

L
Liu Jicong 已提交
736
int walSaveMeta(SWal* pWal) {
L
Liu Jicong 已提交
737
  int  metaVer = walFindCurMetaVer(pWal);
L
Liu Jicong 已提交
738
  char fnameStr[WAL_FILE_LEN];
739
  char tmpFnameStr[WAL_FILE_LEN];
740 741 742 743 744 745 746 747 748 749 750 751 752 753
  int  n;

  // fsync the idx and log file at first to ensure validity of meta
  if (taosFsyncFile(pWal->pIdxFile) < 0) {
    wError("vgId:%d, failed to sync idx file due to %s", pWal->cfg.vgId, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (taosFsyncFile(pWal->pLogFile) < 0) {
    wError("vgId:%d, failed to sync log file due to %s", pWal->cfg.vgId, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
754 755 756 757 758 759

  // flush to a tmpfile
  n = walBuildTmpMetaName(pWal, tmpFnameStr);
  ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name");

  TdFilePtr pMetaFile = taosOpenFile(tmpFnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
L
Liu Jicong 已提交
760
  if (pMetaFile == NULL) {
761
    wError("vgId:%d, failed to open file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
762
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
763 764
    return -1;
  }
765

766
  char* serialized = walMetaSerialize(pWal);
L
Liu Jicong 已提交
767
  int   len = strlen(serialized);
L
Liu Jicong 已提交
768
  if (len != taosWriteFile(pMetaFile, serialized, len)) {
769
    wError("vgId:%d, failed to write file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
770 771 772
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
L
Liu Jicong 已提交
773

774
  if (taosFsyncFile(pMetaFile) < 0) {
775
    wError("vgId:%d, failed to sync file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
776 777 778 779 780
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosCloseFile(&pMetaFile) < 0) {
781
    wError("vgId:%d, failed to close file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), tmpFnameStr);
782 783 784 785 786 787 788 789 790 791 792 793
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  // rename it
  n = walBuildMetaName(pWal, metaVer + 1, fnameStr);
  ASSERT(n < sizeof(fnameStr) && "Buffer overflow of file name");

  if (taosRenameFile(tmpFnameStr, fnameStr) < 0) {
    wError("failed to rename file due to %s. dest:%s", strerror(errno), fnameStr);
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
L
Liu Jicong 已提交
794
  }
L
Liu Jicong 已提交
795 796 797

  // delete old file
  if (metaVer > -1) {
L
Liu Jicong 已提交
798
    walBuildMetaName(pWal, metaVer, fnameStr);
799
    taosRemoveFile(fnameStr);
L
Liu Jicong 已提交
800
  }
wafwerar's avatar
wafwerar 已提交
801
  taosMemoryFree(serialized);
L
Liu Jicong 已提交
802
  return 0;
803 804 805 806 807

_err:
  taosCloseFile(&pMetaFile);
  taosMemoryFree(serialized);
  return -1;
L
Liu Jicong 已提交
808 809
}

L
Liu Jicong 已提交
810
int walLoadMeta(SWal* pWal) {
L
Liu Jicong 已提交
811
  ASSERT(pWal->fileInfoSet->size == 0);
L
Liu Jicong 已提交
812
  // find existing meta file
L
Liu Jicong 已提交
813
  int metaVer = walFindCurMetaVer(pWal);
L
Liu Jicong 已提交
814
  if (metaVer == -1) {
S
Shengliang Guan 已提交
815
    wDebug("vgId:%d, wal find meta ver %d", pWal->cfg.vgId, metaVer);
L
Liu Jicong 已提交
816
    return -1;
L
Liu Jicong 已提交
817 818 819
  }
  char fnameStr[WAL_FILE_LEN];
  walBuildMetaName(pWal, metaVer, fnameStr);
L
Liu Jicong 已提交
820
  // read metafile
L
Liu Jicong 已提交
821 822
  int64_t fileSize = 0;
  taosStatFile(fnameStr, &fileSize, NULL);
L
Liu Jicong 已提交
823 824
  if (fileSize == 0) {
    taosRemoveFile(fnameStr);
S
Shengliang Guan 已提交
825
    wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
L
Liu Jicong 已提交
826 827
    return -1;
  }
L
Liu Jicong 已提交
828
  int   size = (int)fileSize;
wafwerar's avatar
wafwerar 已提交
829
  char* buf = taosMemoryMalloc(size + 5);
L
Liu Jicong 已提交
830
  if (buf == NULL) {
L
Liu Jicong 已提交
831
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
L
Liu Jicong 已提交
832 833
    return -1;
  }
L
Liu Jicong 已提交
834
  memset(buf, 0, size + 5);
835 836
  TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pFile == NULL) {
L
Liu Jicong 已提交
837
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
838
    taosMemoryFree(buf);
L
Liu Jicong 已提交
839 840
    return -1;
  }
841
  if (taosReadFile(pFile, buf, size) != size) {
L
Liu Jicong 已提交
842
    terrno = TAOS_SYSTEM_ERROR(errno);
843
    taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
844
    taosMemoryFree(buf);
L
Liu Jicong 已提交
845 846
    return -1;
  }
L
Liu Jicong 已提交
847
  // load into fileInfoSet
848
  int code = walMetaDeserialize(pWal, buf);
849 850 851 852
  if (code < 0) {
    wError("failed to deserialize wal meta. file:%s", fnameStr);
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
  }
853
  taosCloseFile(&pFile);
wafwerar's avatar
wafwerar 已提交
854
  taosMemoryFree(buf);
L
Liu Jicong 已提交
855
  return code;
L
Liu Jicong 已提交
856
}
857 858 859 860 861 862

int walRemoveMeta(SWal* pWal) {
  int metaVer = walFindCurMetaVer(pWal);
  if (metaVer == -1) return 0;
  char fnameStr[WAL_FILE_LEN];
  walBuildMetaName(pWal, metaVer, fnameStr);
L
Liu Jicong 已提交
863
  return taosRemoveFile(fnameStr);
864
}