walRead.c 15.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "taoserror.h"
L
Liu Jicong 已提交
17
#include "walInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
20 21
  SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
  if (pReader == NULL) {
22
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
23 24
    return NULL;
  }
25

26 27 28 29 30 31 32
  pReader->pWal = pWal;
  pReader->readerId = tGenIdPI64();
  pReader->pIdxFile = NULL;
  pReader->pLogFile = NULL;
  pReader->curVersion = -1;
  pReader->curFileFirstVer = -1;
  pReader->capacity = 0;
L
Liu Jicong 已提交
33
  if (cond) {
34
    pReader->cond = *cond;
L
Liu Jicong 已提交
35
  } else {
36
//    pReader->cond.scanUncommited = 0;
37 38 39
    pReader->cond.scanNotApplied = 0;
    pReader->cond.scanMeta = 0;
    pReader->cond.enableRef = 0;
L
Liu Jicong 已提交
40
  }
L
fix  
Liu Jicong 已提交
41

42
  taosThreadMutexInit(&pReader->mutex, NULL);
L
fix  
Liu Jicong 已提交
43

44 45
  pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
  if (pReader->pHead == NULL) {
S
Shengliang Guan 已提交
46
    terrno = TSDB_CODE_OUT_OF_MEMORY;
47
    taosMemoryFree(pReader);
L
Liu Jicong 已提交
48 49
    return NULL;
  }
L
Liu Jicong 已提交
50

51
  /*if (pReader->cond.enableRef) {*/
L
Liu Jicong 已提交
52
  /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
53 54 55
  /*}*/

  return pReader;
L
Liu Jicong 已提交
56 57
}

58 59 60 61 62
void walCloseReader(SWalReader *pReader) {
  taosCloseFile(&pReader->pIdxFile);
  taosCloseFile(&pReader->pLogFile);
  taosMemoryFreeClear(pReader->pHead);
  taosMemoryFree(pReader);
L
Liu Jicong 已提交
63 64
}

65 66 67 68
int32_t walNextValidMsg(SWalReader *pReader) {
  int64_t fetchVer = pReader->curVersion;
  int64_t lastVer = walGetLastVer(pReader->pWal);
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
69
  int64_t appliedVer = walGetAppliedVer(pReader->pWal);
L
Liu Jicong 已提交
70

S
Shengliang Guan 已提交
71
  wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
72 73
         ", applied index:%" PRId64,
         pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
74 75 76 77
  if (fetchVer > appliedVer){
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }
78
  while (fetchVer <= appliedVer) {
wmmhello's avatar
wmmhello 已提交
79
    if (walFetchHead(pReader, fetchVer) < 0) {
L
Liu Jicong 已提交
80 81
      return -1;
    }
82

83 84 85
    int32_t type = pReader->pHead->head.msgType;
    if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
        (IS_META_MSG(type) && pReader->cond.scanMeta)) {
wmmhello's avatar
wmmhello 已提交
86
      if (walFetchBody(pReader) < 0) {
L
Liu Jicong 已提交
87 88 89 90
        return -1;
      }
      return 0;
    } else {
wmmhello's avatar
wmmhello 已提交
91
      if (walSkipFetchBody(pReader) < 0) {
L
Liu Jicong 已提交
92 93
        return -1;
      }
94

wmmhello's avatar
wmmhello 已提交
95
      fetchVer = pReader->curVersion;
L
Liu Jicong 已提交
96 97
    }
  }
98

L
Liu Jicong 已提交
99
  return -1;
L
Liu Jicong 已提交
100
}
L
Liu Jicong 已提交
101

102
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
103
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
104 105 106 107 108 109 110 111
void    walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver) { atomic_store_64(&pReader->skipToVersion, ver); }

// this function is NOT multi-thread safe, and no need to be.
int64_t walReaderGetSkipToVersion(SWalReader *pReader) {
  int64_t newVersion = pReader->skipToVersion;
  pReader->skipToVersion = 0;
  return newVersion;
}
112

113 114 115 116 117 118 119
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) {
  *sver = walGetFirstVer(pReader->pWal);
  int64_t lastVer = walGetLastVer(pReader->pWal);
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
  *ever = pReader->cond.scanUncommited ? lastVer : committedVer;
}

120 121 122 123 124 125
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){
  // if offset version is small than first version , let's seek to first version
  taosThreadMutexLock(&pWalReader->pWal->mutex);
  int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
  taosThreadMutexUnlock(&pWalReader->pWal->mutex);

126 127
  if (pOffset->version < firstVer){
    pOffset->version = firstVer;
128 129 130
  }
}

131
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
L
Liu Jicong 已提交
132
  int64_t ret = 0;
L
Liu Jicong 已提交
133

134 135
  TdFilePtr pIdxTFile = pReader->pIdxFile;
  TdFilePtr pLogTFile = pReader->pLogFile;
L
Liu Jicong 已提交
136 137

  // seek position
L
Liu Jicong 已提交
138
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
L
Liu Jicong 已提交
139 140
  ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
141
    terrno = TAOS_SYSTEM_ERROR(errno);
142 143
    wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
           ver, offset, terrstr());
L
Liu Jicong 已提交
144 145
    return -1;
  }
L
Liu Jicong 已提交
146
  SWalIdxEntry entry = {0};
L
Liu Jicong 已提交
147 148 149
  if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
150
      wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
L
Liu Jicong 已提交
151 152
    } else {
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
153
      wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %ld",
154
             pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
L
Liu Jicong 已提交
155
    }
L
Liu Jicong 已提交
156 157
    return -1;
  }
L
Liu Jicong 已提交
158 159 160

  ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
161
    terrno = TAOS_SYSTEM_ERROR(errno);
162 163
    wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
           ver, entry.offset, terrstr());
L
Liu Jicong 已提交
164 165
    return -1;
  }
L
Liu Jicong 已提交
166
  return ret;
L
Liu Jicong 已提交
167 168
}

169
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
wmmhello's avatar
wmmhello 已提交
170
  char fnameStr[WAL_FILE_LEN] = {0};
L
Liu Jicong 已提交
171

172 173
  taosCloseFile(&pReader->pIdxFile);
  taosCloseFile(&pReader->pLogFile);
L
Liu Jicong 已提交
174

175 176 177
  walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
  TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pLogFile == NULL) {
L
Liu Jicong 已提交
178
    terrno = TAOS_SYSTEM_ERROR(errno);
179
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
L
Liu Jicong 已提交
180 181 182
    return -1;
  }

183
  pReader->pLogFile = pLogFile;
L
Liu Jicong 已提交
184

185 186 187
  walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
  TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pIdxFile == NULL) {
L
Liu Jicong 已提交
188
    terrno = TAOS_SYSTEM_ERROR(errno);
189
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
L
Liu Jicong 已提交
190 191 192
    return -1;
  }

193
  pReader->pIdxFile = pIdxFile;
194 195 196

  pReader->curFileFirstVer = fileFirstVer;

L
Liu Jicong 已提交
197 198 199
  return 0;
}

200 201
int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
  SWal *pWal = pReader->pWal;
L
Liu Jicong 已提交
202

203
  // bsearch in fileSet
L
Liu Jicong 已提交
204
  SWalFileInfo tmpInfo;
L
Liu Jicong 已提交
205
  tmpInfo.firstVer = ver;
L
Liu Jicong 已提交
206
  SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
207
  if (pRet == NULL) {
208
    wError("failed to find WAL log file with ver:%" PRId64, ver);
209 210 211
    terrno = TSDB_CODE_WAL_INVALID_VER;
    return -1;
  }
212

213
  if (pReader->curFileFirstVer != pRet->firstVer) {
L
Liu Jicong 已提交
214
    // error code was set inner
215
    if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
L
Liu Jicong 已提交
216 217 218 219
      return -1;
    }
  }

L
Liu Jicong 已提交
220
  // error code was set inner
221
  if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) {
L
Liu Jicong 已提交
222 223
    return -1;
  }
L
Liu Jicong 已提交
224

225 226
  wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId,
         pReader->curVersion, ver);
L
Liu Jicong 已提交
227

228
  pReader->curVersion = ver;
L
Liu Jicong 已提交
229 230 231
  return 0;
}

232
int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
233
  SWal *pWal = pReader->pWal;
234
  if (ver == pReader->curVersion) {
S
Shengliang Guan 已提交
235
    wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
236 237 238 239
    return 0;
  }

  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
240
    wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
L
Liu Jicong 已提交
241 242 243 244 245
           ver, pWal->vers.firstVer, pWal->vers.lastVer);
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

246
  if (walReadSeekVerImpl(pReader, ver) < 0) {
L
Liu Jicong 已提交
247 248 249
    return -1;
  }

L
Liu Jicong 已提交
250 251 252
  return 0;
}

253

wmmhello's avatar
wmmhello 已提交
254
int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
L
Liu Jicong 已提交
255
  int64_t code;
256 257 258
  int64_t contLen;
  bool    seeked = false;

S
Shengliang Guan 已提交
259
  wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
L
Liu Jicong 已提交
260 261 262
         ", applied ver:%" PRId64,
         pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
         pRead->pWal->vers.appliedVer);
L
Liu Jicong 已提交
263

264
  // TODO: valid ver
265 266 267
  if (ver > pRead->pWal->vers.commitVer) {
    return -1;
  }
268

269
  if (pRead->curVersion != ver) {
270
    code = walReaderSeekVer(pRead, ver);
271 272 273 274
    if (code < 0) {
      return -1;
    }
    seeked = true;
275 276
  }

277
  while (1) {
wmmhello's avatar
wmmhello 已提交
278
    contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
279 280 281
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
282 283 284
      if(walReadSeekVerImpl(pRead, ver) < 0){
        return -1;
      }
285 286 287 288 289 290 291 292 293 294
      seeked = true;
      continue;
    } else {
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
      return -1;
    }
295 296
  }

wmmhello's avatar
wmmhello 已提交
297
  code = walValidHeadCksum(pRead->pHead);
298 299

  if (code != 0) {
L
Liu Jicong 已提交
300
    wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
301 302 303 304 305 306 307
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

wmmhello's avatar
wmmhello 已提交
308
int32_t walSkipFetchBody(SWalReader *pRead) {
S
Shengliang Guan 已提交
309
  wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
L
Liu Jicong 已提交
310
         ", applied ver:%" PRId64,
wmmhello's avatar
wmmhello 已提交
311
         pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
L
Liu Jicong 已提交
312 313
         pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);

wmmhello's avatar
wmmhello 已提交
314
  int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
315 316 317 318 319 320 321 322 323
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  pRead->curVersion++;
  return 0;
}

wmmhello's avatar
wmmhello 已提交
324 325
int32_t walFetchBody(SWalReader *pRead) {
  SWalCont *pReadHead = &pRead->pHead->head;
L
Liu Jicong 已提交
326
  int64_t   ver = pReadHead->version;
327

S
Shengliang Guan 已提交
328
  wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
L
Liu Jicong 已提交
329 330 331 332
         ", applied ver:%" PRId64,
         pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
         pRead->pWal->vers.appliedVer);

333
  if (pRead->capacity < pReadHead->bodyLen) {
wmmhello's avatar
wmmhello 已提交
334
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
335
    if (ptr == NULL) {
S
Shengliang Guan 已提交
336
      terrno = TSDB_CODE_OUT_OF_MEMORY;
337 338
      return -1;
    }
wmmhello's avatar
wmmhello 已提交
339 340
    pRead->pHead = ptr;
    pReadHead = &pRead->pHead->head;
341 342 343
    pRead->capacity = pReadHead->bodyLen;
  }

L
Liu Jicong 已提交
344
  if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
345 346 347 348 349 350 351 352 353
    if (pReadHead->bodyLen < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
             pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
    } else {
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
             pRead->pWal->cfg.vgId, pReadHead->version, ver);
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    }
354 355 356 357
    return -1;
  }

  if (pReadHead->version != ver) {
S
Shengliang Guan 已提交
358
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
359
           pReadHead->version, ver);
360 361 362 363
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
364
  if (walValidBodyCksum(pRead->pHead) != 0) {
365 366
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
           ver);
367 368 369 370
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
371
  pRead->curVersion++;
372 373 374
  return 0;
}

375
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
S
Shengliang Guan 已提交
376
  wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
377
  int64_t contLen;
378
  int32_t code;
L
Liu Jicong 已提交
379
  bool    seeked = false;
380

381
  if (walIsEmpty(pReader->pWal)) {
382 383 384 385
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

386 387 388
  if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
    wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
           ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
M
Minghao Li 已提交
389 390 391 392
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

393 394
  taosThreadMutexLock(&pReader->mutex);

395
  if (pReader->curVersion != ver) {
396
    if (walReaderSeekVer(pReader, ver) < 0) {
397
      wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
398
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
399 400 401 402
      return -1;
    }
    seeked = true;
  }
L
Liu Jicong 已提交
403

L
Liu Jicong 已提交
404
  while (1) {
405
    contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
L
Liu Jicong 已提交
406 407 408
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
409 410 411 412
      if(walReadSeekVerImpl(pReader, ver) < 0){
        taosThreadMutexUnlock(&pReader->mutex);
        return -1;
      }
L
Liu Jicong 已提交
413 414 415 416 417 418 419 420
      seeked = true;
      continue;
    } else {
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
421 422
      wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
             pReader->pWal->cfg.vgId, ver, terrstr());
423
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
424
      return -1;
M
Minghao Li 已提交
425
    }
L
Liu Jicong 已提交
426
  }
427

428 429 430
  code = walValidHeadCksum(pReader->pHead);
  if (code != 0) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
431
           ver);
L
Liu Jicong 已提交
432
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
433
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
434 435
    return -1;
  }
436

437
  if (pReader->capacity < pReader->pHead->head.bodyLen) {
L
Liu Jicong 已提交
438 439
    SWalCkHead *ptr =
        (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
L
Liu Jicong 已提交
440
    if (ptr == NULL) {
S
Shengliang Guan 已提交
441
      terrno = TSDB_CODE_OUT_OF_MEMORY;
442
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
443 444
      return -1;
    }
L
Liu Jicong 已提交
445
    pReader->pHead = ptr;
446
    pReader->capacity = pReader->pHead->head.bodyLen;
L
Liu Jicong 已提交
447
  }
L
Liu Jicong 已提交
448

449 450
  if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) !=
      pReader->pHead->head.bodyLen) {
L
Liu Jicong 已提交
451
    if (contLen < 0)
L
Liu Jicong 已提交
452
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minghao Li 已提交
453
    else {
L
Liu Jicong 已提交
454
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
M
Minghao Li 已提交
455
    }
456 457
    wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
           pReader->pWal->cfg.vgId, ver, terrstr());
458
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
459 460
    return -1;
  }
L
Liu Jicong 已提交
461

462 463 464
  if (pReader->pHead->head.version != ver) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
           pReader->pHead->head.version, ver);
465
//    pReader->curInvalid = 1;
L
Liu Jicong 已提交
466
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
467
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
468 469
    return -1;
  }
L
Liu Jicong 已提交
470

471 472 473
  code = walValidBodyCksum(pReader->pHead);
  if (code != 0) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
474
           ver);
475 476
    uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
    uint32_t logCkSum = pReader->pHead->cksumBody;
S
Shengliang Guan 已提交
477
    wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
478
//    pReader->curInvalid = 1;
L
Liu Jicong 已提交
479
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
480
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
481 482
    return -1;
  }
483
  pReader->curVersion++;
L
Liu Jicong 已提交
484

485 486
  taosThreadMutexUnlock(&pReader->mutex);

L
Liu Jicong 已提交
487 488
  return 0;
}
489 490 491 492 493 494

void walReadReset(SWalReader *pReader) {
  taosThreadMutexLock(&pReader->mutex);
  taosCloseFile(&pReader->pIdxFile);
  taosCloseFile(&pReader->pLogFile);
  pReader->curFileFirstVer = -1;
495
  pReader->curVersion = -1;
496 497
  taosThreadMutexUnlock(&pReader->mutex);
}