walWrite.c 16.0 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18
#include "os.h"
S
Shengliang Guan 已提交
19
#include "taoserror.h"
20
#include "tchecksum.h"
S
TD-1895  
Shengliang Guan 已提交
21
#include "tfile.h"
S
Shengliang Guan 已提交
22
#include "walInt.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23

L
Liu Jicong 已提交
24 25
static void walFtruncate(SWal *pWal, int64_t ver);

L
Liu Jicong 已提交
26
int32_t walCommit(SWal *pWal, int64_t ver) {
L
Liu Jicong 已提交
27 28 29 30 31
  ASSERT(pWal->snapshotVersion <= pWal->commitVersion);
  ASSERT(pWal->commitVersion <= pWal->lastVersion);
  ASSERT(ver >= pWal->commitVersion);
  ASSERT(ver <= pWal->lastVersion);
  pWal->commitVersion = ver;
L
Liu Jicong 已提交
32 33 34 35
  return 0;
}

int32_t walRollback(SWal *pWal, int64_t ver) {
L
Liu Jicong 已提交
36
  //TODO: ftruncate
L
Liu Jicong 已提交
37 38 39 40 41
  ASSERT(ver > pWal->commitVersion);
  ASSERT(ver <= pWal->lastVersion);
  //seek position
  walSeekVer(pWal, ver); 
  walFtruncate(pWal, ver);
L
Liu Jicong 已提交
42 43 44 45
  return 0;
}

int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
L
Liu Jicong 已提交
46 47
  pWal->snapshotVersion = ver;

L
Liu Jicong 已提交
48 49
  WalFileInfo tmp;
  tmp.firstVer = ver;
L
Liu Jicong 已提交
50
  //mark files safe to delete
L
Liu Jicong 已提交
51 52 53 54 55 56 57 58
  WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
  //iterate files, until the searched result
  //if totSize > rtSize, delete
  //if createTs > retentionTs, delete

  //save snapshot ver, commit ver

  //make new array, remove files
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60 61 62
  return 0;
}

L
Liu Jicong 已提交
63
#if 0
S
TD-1882  
Shengliang Guan 已提交
64
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
65

S
Shengliang Guan 已提交
66
int32_t walRenew(void *handle) {
67
  if (handle == NULL) return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68

S
TD-1846  
Shengliang Guan 已提交
69 70
  SWal *  pWal = handle;
  int32_t code = 0;
71

L
Liu Jicong 已提交
72 73 74 75
  /*if (pWal->stop) {*/
    /*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/
    /*return 0;*/
  /*}*/
S
TD-1894  
Shengliang Guan 已提交
76

77 78
  pthread_mutex_lock(&pWal->mutex);

L
Liu Jicong 已提交
79 80 81
  if (tfValid(pWal->logTfd)) {
    tfClose(pWal->logTfd);
    wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82 83
  }

L
Liu Jicong 已提交
84 85 86 87 88 89
  /*if (pWal->keep == TAOS_WAL_KEEP) {*/
    /*pWal->fileId = 0;*/
  /*} else {*/
    /*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/
    /*pWal->fileId++;*/
  /*}*/
90

L
Liu Jicong 已提交
91 92
  snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
  pWal->logTfd = tfOpenCreateWrite(pWal->logName);
93

L
Liu Jicong 已提交
94
  if (!tfValid(pWal->logTfd)) {
S
TD-1846  
Shengliang Guan 已提交
95
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
96
    wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
97
  } else {
L
Liu Jicong 已提交
98
    wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName);
S
TD-1846  
Shengliang Guan 已提交
99
  }
100

S
TD-1949  
Shengliang Guan 已提交
101 102 103 104 105
  pthread_mutex_unlock(&pWal->mutex);

  return code;
}

S
TD-1949  
Shengliang Guan 已提交
106
void walRemoveOneOldFile(void *handle) {
S
TD-1949  
Shengliang Guan 已提交
107 108
  SWal *pWal = handle;
  if (pWal == NULL) return;
L
Liu Jicong 已提交
109
  /*if (pWal->keep == TAOS_WAL_KEEP) return;*/
L
Liu Jicong 已提交
110
  if (!tfValid(pWal->logTfd)) return;
S
TD-1949  
Shengliang Guan 已提交
111 112 113 114 115

  pthread_mutex_lock(&pWal->mutex);

  // remove the oldest wal file
  int64_t oldFileId = -1;
L
Liu Jicong 已提交
116
  if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) {
S
TD-1949  
Shengliang Guan 已提交
117 118 119 120 121 122 123
    char walName[WAL_FILE_LEN] = {0};
    snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);

    if (remove(walName) < 0) {
      wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
    } else {
      wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124
    }
S
TD-1652  
Shengliang Guan 已提交
125 126
  }

127
  pthread_mutex_unlock(&pWal->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128 129
}

S
TD-1949  
Shengliang Guan 已提交
130 131 132 133 134
void walRemoveAllOldFiles(void *handle) {
  if (handle == NULL) return;

  SWal *  pWal = handle;
  int64_t fileId = -1;
S
TD-2087  
Shengliang Guan 已提交
135 136

  pthread_mutex_lock(&pWal->mutex);
S
Shengliang Guan 已提交
137
  
L
Liu Jicong 已提交
138 139
  tfClose(pWal->logTfd);
  wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName);
S
Shengliang Guan 已提交
140

S
TD-1949  
Shengliang Guan 已提交
141
  while (walGetNextFile(pWal, &fileId) >= 0) {
L
Liu Jicong 已提交
142
    snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
S
TD-1949  
Shengliang Guan 已提交
143

L
Liu Jicong 已提交
144 145
    if (remove(pWal->logName) < 0) {
      wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno));
S
TD-1949  
Shengliang Guan 已提交
146
    } else {
L
Liu Jicong 已提交
147
      wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName);
S
TD-1949  
Shengliang Guan 已提交
148 149
    }
  }
S
TD-2087  
Shengliang Guan 已提交
150
  pthread_mutex_unlock(&pWal->mutex);
S
TD-1949  
Shengliang Guan 已提交
151
}
L
Liu Jicong 已提交
152
#endif
153

L
Liu Jicong 已提交
154
int walRoll(SWal *pWal) {
L
Liu Jicong 已提交
155
  int code = 0;
L
Liu Jicong 已提交
156 157 158 159 160
  if(pWal->curIdxTfd != -1) {
    code = tfClose(pWal->curIdxTfd);
    if(code != 0) {
      return -1;
    }
L
Liu Jicong 已提交
161
  }
L
Liu Jicong 已提交
162 163 164 165 166
  if(pWal->curLogTfd != -1) {
    code = tfClose(pWal->curLogTfd);
    if(code != 0) {
      return -1;
    }
L
Liu Jicong 已提交
167 168 169 170 171
  }
  int64_t idxTfd, logTfd;
  //create new file
  int64_t newFileFirstVersion = pWal->lastVersion + 1;
  char fnameStr[WAL_FILE_LEN];
L
Liu Jicong 已提交
172
  walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
L
Liu Jicong 已提交
173
  idxTfd = tfOpenCreateWrite(fnameStr);
L
Liu Jicong 已提交
174 175 176 177 178
  if(idxTfd < 0) {
    ASSERT(0);
    return -1;
  }
  walBuildLogName(pWal, newFileFirstVersion, fnameStr);
L
Liu Jicong 已提交
179
  logTfd = tfOpenCreateWrite(fnameStr);
L
Liu Jicong 已提交
180 181 182 183 184 185 186 187 188
  if(logTfd < 0) {
    ASSERT(0);
    return -1;
  }
  code = walRollFileInfo(pWal);
  if(code != 0) {
    ASSERT(0);
    return -1;
  }
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201

  //switch file
  pWal->curIdxTfd = idxTfd;
  pWal->curLogTfd = logTfd;
  //change status
  pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;

  pWal->lastRollSeq = walGetSeq();
  return 0;
}

int walChangeFileToLast(SWal *pWal) {
  int64_t idxTfd, logTfd;
L
Liu Jicong 已提交
202
  WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
L
Liu Jicong 已提交
203
  ASSERT(pRet != NULL);
L
Liu Jicong 已提交
204
  int64_t fileFirstVer = pRet->firstVer;
L
Liu Jicong 已提交
205 206

  char fnameStr[WAL_FILE_LEN];
L
Liu Jicong 已提交
207
  walBuildIdxName(pWal, fileFirstVer, fnameStr);
L
Liu Jicong 已提交
208
  idxTfd = tfOpenReadWrite(fnameStr);
L
Liu Jicong 已提交
209 210 211 212
  if(idxTfd < 0) {
    return -1;
  }
  walBuildLogName(pWal, fileFirstVer, fnameStr);
L
Liu Jicong 已提交
213
  logTfd = tfOpenReadWrite(fnameStr);
L
Liu Jicong 已提交
214 215 216
  if(logTfd < 0) {
    return -1;
  }
L
Liu Jicong 已提交
217 218 219 220
  //switch file
  pWal->curIdxTfd = idxTfd;
  pWal->curLogTfd = logTfd;
  //change status
L
Liu Jicong 已提交
221
  pWal->curVersion = fileFirstVer;
L
Liu Jicong 已提交
222 223 224 225
  pWal->curStatus = WAL_CUR_FILE_WRITABLE;
  return 0;
}

L
Liu Jicong 已提交
226
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
L
Liu Jicong 已提交
227 228 229 230
  int code = 0;
  //get index file
  if(!tfValid(pWal->curIdxTfd)) {
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
231 232 233 234

    WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
    wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pInfo->firstVer, strerror(errno));
    return code;
L
Liu Jicong 已提交
235 236 237 238
  }
  int64_t writeBuf[2] = { ver, offset };
  int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf));
  if(size != sizeof(writeBuf)) {
L
Liu Jicong 已提交
239
    return -1;
L
Liu Jicong 已提交
240 241 242 243
  }
  return 0;
}

L
Liu Jicong 已提交
244
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) {
L
Liu Jicong 已提交
245
  if (pWal == NULL) return -1;
L
Liu Jicong 已提交
246
  int code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
247

S
TD-1652  
Shengliang Guan 已提交
248
  // no wal
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249
  if (pWal->level == TAOS_WAL_NOLOG) return 0;
L
Liu Jicong 已提交
250 251

  if (index == pWal->lastVersion + 1) {
L
Liu Jicong 已提交
252 253 254
    if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
      code = walRoll(pWal);
      ASSERT(code == 0);
L
Liu Jicong 已提交
255
    } else {
L
Liu Jicong 已提交
256 257 258 259 260 261
      int64_t passed = walGetSeq() - pWal->lastRollSeq;
      if(pWal->rollPeriod != -1 && passed > pWal->rollPeriod) {
        walRoll(pWal);
      } else if(pWal->segSize != -1 && walGetLastFileSize(pWal) > pWal->segSize) {
        walRoll(pWal);
      }
L
Liu Jicong 已提交
262 263 264 265 266 267
    }
  } else {
    //reject skip log or rewrite log
    //must truncate explicitly first
    return -1;
  }
L
Liu Jicong 已提交
268
  /*if (!tfValid(pWal->curLogTfd)) return 0;*/
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269

L
Liu Jicong 已提交
270
  pWal->head.version = index;
L
Liu Jicong 已提交
271

L
Liu Jicong 已提交
272 273 274
  pWal->head.signature = WAL_SIGNATURE;
  pWal->head.len = bodyLen;
  pWal->head.msgType = msgType;
275

L
Liu Jicong 已提交
276 277
  pWal->head.cksumHead = taosCalcChecksum(0, (const uint8_t*)&pWal->head, sizeof(SWalHead)- sizeof(uint32_t)*2);
  pWal->head.cksumBody = taosCalcChecksum(0, (const uint8_t*)&body, bodyLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278

279 280
  pthread_mutex_lock(&pWal->mutex);

L
Liu Jicong 已提交
281 282
  if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
    //ftruncate
S
TD-1846  
Shengliang Guan 已提交
283
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
284
    wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
J
Jeff Tao 已提交
285
  }
S
TD-1846  
Shengliang Guan 已提交
286

L
Liu Jicong 已提交
287 288 289
  if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) {
    //ftruncate
    code = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
290 291 292 293 294
    wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
  }
  code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal));
  if(code != 0) {
    //TODO
L
Liu Jicong 已提交
295
  }
296

L
Liu Jicong 已提交
297 298
  //set status
  pWal->lastVersion = index;
L
Liu Jicong 已提交
299 300
  walGetCurFileInfo(pWal)->lastVer = index;
  walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
L
Liu Jicong 已提交
301
  
L
Liu Jicong 已提交
302
  pthread_mutex_unlock(&pWal->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303

S
TD-1846  
Shengliang Guan 已提交
304
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
305 306
}

L
Liu Jicong 已提交
307
void walFsync(SWal *pWal, bool forceFsync) {
L
Liu Jicong 已提交
308
  if (pWal == NULL || !tfValid(pWal->curLogTfd)) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
309

S
TD-1948  
Shengliang Guan 已提交
310
  if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
L
Liu Jicong 已提交
311
    wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal));
L
Liu Jicong 已提交
312
    if (tfFsync(pWal->curLogTfd) < 0) {
L
Liu Jicong 已提交
313
      wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno));
314 315
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
316 317
}

L
Liu Jicong 已提交
318
#if 0
S
TD-1918  
Shengliang Guan 已提交
319
int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
S
TD-1846  
Shengliang Guan 已提交
320
  if (handle == NULL) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
321

S
TD-1846  
Shengliang Guan 已提交
322 323
  SWal *  pWal = handle;
  int32_t count = 0;
S
TD-1846  
Shengliang Guan 已提交
324 325
  int32_t code = 0;
  int64_t fileId = -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326

S
TD-1846  
Shengliang Guan 已提交
327
  while ((code = walGetNextFile(pWal, &fileId)) >= 0) {
L
Liu Jicong 已提交
328
    /*if (fileId == pWal->curFileId) continue;*/
J
Jeff Tao 已提交
329

S
TD-1846  
Shengliang Guan 已提交
330
    char walName[WAL_FILE_LEN];
L
Liu Jicong 已提交
331
    snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
332

S
TD-2381  
Shengliang Guan 已提交
333
    wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName);
334
    code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId);
S
TD-1846  
Shengliang Guan 已提交
335
    if (code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
336
      wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code));
S
TD-1846  
Shengliang Guan 已提交
337 338
      continue;
    }
S
TD-1846  
Shengliang Guan 已提交
339

L
Liu Jicong 已提交
340
    wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion);
S
TD-1846  
Shengliang Guan 已提交
341 342

    count++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343 344
  }

L
Liu Jicong 已提交
345
  /*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/
S
TD-1846  
Shengliang Guan 已提交
346 347

  if (count == 0) {
S
TD-1894  
Shengliang Guan 已提交
348
    wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
S
TD-1846  
Shengliang Guan 已提交
349 350 351
    return walRenew(pWal);
  } else {
    // open the existing WAL file in append mode
L
Liu Jicong 已提交
352 353 354 355 356
    /*pWal->curFileId = 0;*/
    snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId);
    pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName);
    if (!tfValid(pWal->logTfd)) {
      wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno));
S
TD-1846  
Shengliang Guan 已提交
357
      return TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358
    }
L
Liu Jicong 已提交
359
    wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
360 361
  }

S
TD-1846  
Shengliang Guan 已提交
362
  return TSDB_CODE_SUCCESS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
363 364
}

S
TD-1846  
Shengliang Guan 已提交
365 366 367
int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
  if (handle == NULL) return -1;
  SWal *pWal = handle;
368

S
TD-1846  
Shengliang Guan 已提交
369 370
  if (*fileId == 0) *fileId = -1;

371
  pthread_mutex_lock(&(pWal->mutex));
S
TD-1846  
Shengliang Guan 已提交
372

S
TD-1846  
Shengliang Guan 已提交
373 374 375
  int32_t code = walGetNextFile(pWal, fileId);
  if (code >= 0) {
    sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId);
L
Liu Jicong 已提交
376
    /*code = (*fileId == pWal->curFileId) ? 0 : 1;*/
377
  }
S
TD-1846  
Shengliang Guan 已提交
378

L
Liu Jicong 已提交
379
  wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId);
380 381 382
  pthread_mutex_unlock(&(pWal->mutex));

  return code;
S
TD-1652  
Shengliang Guan 已提交
383
}
L
Liu Jicong 已提交
384
#endif
385

L
Liu Jicong 已提交
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
static int walValidateOffset(SWal* pWal, int64_t ver) {
  int code = 0;
  SWalHead *pHead = NULL;
  code = (int)walRead(pWal, &pHead, ver);
  if(pHead->version != ver) {
    return -1;
  }
  return 0;
}

static int64_t walGetOffset(SWal* pWal, int64_t ver) {
  int code = walSeekVer(pWal, ver);
  if(code != 0) {
    return -1;
  }

  code = walValidateOffset(pWal, ver);
  if(code != 0) {
    return -1;
  }

  return 0;
}

static void walFtruncate(SWal *pWal, int64_t ver) {
  int64_t tfd = pWal->curLogTfd;
  tfFtruncate(tfd, ver);
  tfFsync(tfd);
  tfd = pWal->curIdxTfd;
  tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE);
S
TD-1895  
Shengliang Guan 已提交
416
  tfFsync(tfd);
S
TD-1891  
Shengliang Guan 已提交
417 418
}

L
Liu Jicong 已提交
419
#if 0
S
TD-1895  
Shengliang Guan 已提交
420
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
S
TD-1891  
Shengliang Guan 已提交
421 422 423 424
  int64_t pos = *offset;
  while (1) {
    pos++;

S
TD-1895  
Shengliang Guan 已提交
425
    if (tfLseek(tfd, pos, SEEK_SET) < 0) {
S
TD-1891  
Shengliang Guan 已提交
426 427 428 429
      wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
      return TSDB_CODE_WAL_FILE_CORRUPTED;
    }

S
TD-1895  
Shengliang Guan 已提交
430
    if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
S
TD-1891  
Shengliang Guan 已提交
431 432 433 434 435 436 437 438
      wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
      return TSDB_CODE_WAL_FILE_CORRUPTED;
    }

    if (pHead->signature != WAL_SIGNATURE) {
      continue;
    }

C
Cary Xu 已提交
439
    if (pHead->sver >= 1) {
440 441 442 443 444 445 446 447 448 449 450
      if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
	wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
	return TSDB_CODE_WAL_FILE_CORRUPTED;
      }

      if (walValidateChecksum(pHead)) {
	wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
	*offset = pos;
	return TSDB_CODE_SUCCESS;
      }
    }
S
TD-1891  
Shengliang Guan 已提交
451 452 453 454
  }

  return TSDB_CODE_WAL_FILE_CORRUPTED;
}
455

S
TD-1882  
Shengliang Guan 已提交
456
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) {
S
TD-1846  
Shengliang Guan 已提交
457
  int32_t size = WAL_MAX_SIZE;
L
Liu Jicong 已提交
458
  void *  buffer = malloc(size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
459
  if (buffer == NULL) {
S
TD-1846  
Shengliang Guan 已提交
460
    wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
S
TD-1846  
Shengliang Guan 已提交
461
    return TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
462
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
463

464
  int64_t tfd = tfOpenReadWrite(name);
S
TD-1895  
Shengliang Guan 已提交
465
  if (!tfValid(tfd)) {
S
TD-1846  
Shengliang Guan 已提交
466
    wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
S
TD-1846  
Shengliang Guan 已提交
467 468
    tfree(buffer);
    return TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
469 470
  } else {
    wDebug("vgId:%d, file:%s, open for restore", pWal->vgId, name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
471 472
  }

S
TD-1846  
Shengliang Guan 已提交
473
  int32_t   code = TSDB_CODE_SUCCESS;
S
Shengliang Guan 已提交
474
  int64_t   offset = 0;
S
TD-1846  
Shengliang Guan 已提交
475 476
  SWalHead *pHead = buffer;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
477
  while (1) {
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
    int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead));
    if (ret == 0) break;

    if (ret < 0) {
      wError("vgId:%d, file:%s, failed to read wal head since %s", pWal->vgId, name, strerror(errno));
      code = TAOS_SYSTEM_ERROR(errno);
      break;
    }

    if (ret < sizeof(SWalHead)) {
      wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
      walFtruncate(pWal, tfd, offset);
      break;
    }

493
    if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
      wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
             pHead->version, pHead->len, offset);
      code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
      if (code != TSDB_CODE_SUCCESS) {
        walFtruncate(pWal, tfd, offset);
        break;
      }
    }

    if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
      wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
             pHead->version, pHead->len, offset);
      code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
      if (code != TSDB_CODE_SUCCESS) {
        walFtruncate(pWal, tfd, offset);
        break;
      }
    }

    ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
    if (ret < 0) {
      wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
      code = TAOS_SYSTEM_ERROR(errno);
      break;
    }

    if (ret < pHead->len) {
      wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
      offset += sizeof(SWalHead);
      continue;
    }

C
Cary Xu 已提交
526
    if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) {
527 528 529 530 531 532 533 534 535
      wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
             pHead->version, pHead->len, offset);
      code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
      if (code != TSDB_CODE_SUCCESS) {
        walFtruncate(pWal, tfd, offset);
        break;
      }
    }

H
Hongze Cheng 已提交
536 537
    offset = offset + sizeof(SWalHead) + pHead->len;

S
TD-4176  
Shengliang Guan 已提交
538
    wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
L
Liu Jicong 已提交
539
           pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset);
S
TD-1846  
Shengliang Guan 已提交
540

L
Liu Jicong 已提交
541
    pWal->curVersion = pHead->version;
542

543
    // wInfo("writeFp: %ld", offset);
L
Liu Jicong 已提交
544
    (*writeFp)(pVnode, pHead);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
545 546
  }

S
TD-1895  
Shengliang Guan 已提交
547
  tfClose(tfd);
S
TD-1846  
Shengliang Guan 已提交
548
  tfree(buffer);
J
Jeff Tao 已提交
549

S
Shengliang Guan 已提交
550
  wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name);
S
TD-1846  
Shengliang Guan 已提交
551
  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
552
}
L
Liu Jicong 已提交
553
#endif