walRead.c 14.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "taoserror.h"
L
Liu Jicong 已提交
17
#include "walInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20 21 22
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);

L
Liu Jicong 已提交
23
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
24
  SWalReader *pRead = taosMemoryCalloc(1, sizeof(SWalReader));
L
Liu Jicong 已提交
25
  if (pRead == NULL) {
26
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
27 28
    return NULL;
  }
29

L
Liu Jicong 已提交
30
  pRead->pWal = pWal;
L
Liu Jicong 已提交
31 32
  pRead->pIdxFile = NULL;
  pRead->pLogFile = NULL;
L
Liu Jicong 已提交
33
  pRead->curVersion = -1;
L
Liu Jicong 已提交
34
  pRead->curFileFirstVer = -1;
L
Liu Jicong 已提交
35
  pRead->curInvalid = 1;
L
Liu Jicong 已提交
36
  pRead->capacity = 0;
L
Liu Jicong 已提交
37
  if (cond) {
L
Liu Jicong 已提交
38
    pRead->cond = *cond;
L
Liu Jicong 已提交
39
  } else {
L
Liu Jicong 已提交
40 41
    pRead->cond.scanMeta = 0;
    pRead->cond.scanUncommited = 0;
L
Liu Jicong 已提交
42
    pRead->cond.enableRef = 0;
L
Liu Jicong 已提交
43
  }
L
fix  
Liu Jicong 已提交
44 45 46

  taosThreadMutexInit(&pRead->mutex, NULL);

L
Liu Jicong 已提交
47 48 49 50
  /*if (pRead->cond.enableRef) {*/
  /*walOpenRef(pWal);*/
  /*}*/

L
Liu Jicong 已提交
51
  pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
L
Liu Jicong 已提交
52
  if (pRead->pHead == NULL) {
L
Liu Jicong 已提交
53
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
54
    taosMemoryFree(pRead);
L
Liu Jicong 已提交
55 56
    return NULL;
  }
L
Liu Jicong 已提交
57

L
Liu Jicong 已提交
58
  return pRead;
L
Liu Jicong 已提交
59 60
}

L
Liu Jicong 已提交
61 62 63
void walCloseReader(SWalReader *pRead) {
  taosCloseFile(&pRead->pIdxFile);
  taosCloseFile(&pRead->pLogFile);
wafwerar's avatar
wafwerar 已提交
64 65
  taosMemoryFreeClear(pRead->pHead);
  taosMemoryFree(pRead);
L
Liu Jicong 已提交
66 67
}

L
Liu Jicong 已提交
68 69
int32_t walNextValidMsg(SWalReader *pRead) {
  int64_t fetchVer = pRead->curVersion;
L
Liu Jicong 已提交
70 71 72 73 74 75 76 77
  int64_t lastVer = walGetLastVer(pRead->pWal);
  int64_t committedVer = walGetCommittedVer(pRead->pWal);
  int64_t appliedVer = walGetAppliedVer(pRead->pWal);
  int64_t endVer = pRead->cond.scanUncommited ? lastVer : committedVer;
  endVer = TMIN(appliedVer, endVer);

  wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
         pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
78
  pRead->curStopped = 0;
L
Liu Jicong 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
  while (fetchVer <= endVer) {
    if (walFetchHeadNew(pRead, fetchVer) < 0) {
      return -1;
    }
    if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT ||
        (IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) {
      if (walFetchBodyNew(pRead) < 0) {
        return -1;
      }
      return 0;
    } else {
      if (walSkipFetchBodyNew(pRead) < 0) {
        return -1;
      }
      fetchVer++;
      ASSERT(fetchVer == pRead->curVersion);
    }
  }
97
  pRead->curStopped = 1;
L
Liu Jicong 已提交
98
  return -1;
L
Liu Jicong 已提交
99
}
L
Liu Jicong 已提交
100

L
Liu Jicong 已提交
101
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
L
Liu Jicong 已提交
102
  int64_t ret = 0;
L
Liu Jicong 已提交
103

L
Liu Jicong 已提交
104 105
  TdFilePtr pIdxTFile = pRead->pIdxFile;
  TdFilePtr pLogTFile = pRead->pLogFile;
L
Liu Jicong 已提交
106 107

  // seek position
L
Liu Jicong 已提交
108
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
L
Liu Jicong 已提交
109 110
  ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
111
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
112
    wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver,
S
Shengliang Guan 已提交
113
           offset, terrstr());
L
Liu Jicong 已提交
114 115
    return -1;
  }
L
Liu Jicong 已提交
116
  SWalIdxEntry entry = {0};
L
Liu Jicong 已提交
117 118 119
  if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
120
      wError("vgId:%d, failed to read idx file, since %s", pRead->pWal->cfg.vgId, terrstr());
L
Liu Jicong 已提交
121 122
    } else {
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
S
Shengliang Guan 已提交
123 124
      wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %" PRIu64,
             pRead->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
L
Liu Jicong 已提交
125
    }
L
Liu Jicong 已提交
126 127
    return -1;
  }
L
Liu Jicong 已提交
128

L
Liu Jicong 已提交
129
  ASSERT(entry.ver == ver);
L
Liu Jicong 已提交
130 131
  ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
132
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
133
    wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver,
S
Shengliang Guan 已提交
134
           entry.offset, terrstr());
L
Liu Jicong 已提交
135 136
    return -1;
  }
L
Liu Jicong 已提交
137
  return ret;
L
Liu Jicong 已提交
138 139
}

L
Liu Jicong 已提交
140
static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
L
Liu Jicong 已提交
141 142
  char fnameStr[WAL_FILE_LEN];

L
Liu Jicong 已提交
143 144
  taosCloseFile(&pRead->pIdxFile);
  taosCloseFile(&pRead->pLogFile);
L
Liu Jicong 已提交
145 146

  walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
147 148
  TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pLogTFile == NULL) {
L
Liu Jicong 已提交
149
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
150
    wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr());
L
Liu Jicong 已提交
151 152 153
    return -1;
  }

L
Liu Jicong 已提交
154
  pRead->pLogFile = pLogTFile;
L
Liu Jicong 已提交
155

L
Liu Jicong 已提交
156
  walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
157 158
  TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pIdxTFile == NULL) {
L
Liu Jicong 已提交
159
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
160
    wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr());
L
Liu Jicong 已提交
161 162 163
    return -1;
  }

L
Liu Jicong 已提交
164
  pRead->pIdxFile = pIdxTFile;
L
Liu Jicong 已提交
165 166 167
  return 0;
}

L
Liu Jicong 已提交
168
int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) {
L
Liu Jicong 已提交
169 170
  SWal *pWal = pRead->pWal;

L
Liu Jicong 已提交
171
  SWalFileInfo tmpInfo;
L
Liu Jicong 已提交
172
  tmpInfo.firstVer = ver;
L
Liu Jicong 已提交
173
  // bsearch in fileSet
L
Liu Jicong 已提交
174
  SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
L
Liu Jicong 已提交
175
  ASSERT(pRet != NULL);
L
Liu Jicong 已提交
176
  if (pRead->curFileFirstVer != pRet->firstVer) {
L
Liu Jicong 已提交
177
    // error code was set inner
L
Liu Jicong 已提交
178
    if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
L
Liu Jicong 已提交
179 180 181 182
      return -1;
    }
  }

L
Liu Jicong 已提交
183
  // error code was set inner
L
Liu Jicong 已提交
184
  if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
L
Liu Jicong 已提交
185 186
    return -1;
  }
L
Liu Jicong 已提交
187

L
Liu Jicong 已提交
188
  wDebug("wal version reset from %ld(invalid: %d) to %ld", pRead->curVersion, pRead->curInvalid, ver);
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190 191 192 193 194 195 196 197 198 199 200 201
  pRead->curVersion = ver;
  return 0;
}

int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
  SWal *pWal = pRead->pWal;
  if (!pRead->curInvalid && ver == pRead->curVersion) {
    wDebug("wal version %ld match, no need to reset", ver);
    return 0;
  }

  pRead->curInvalid = 1;
L
Liu Jicong 已提交
202
  pRead->curVersion = ver;
L
Liu Jicong 已提交
203

L
Liu Jicong 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216
  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
    wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
           ver, pWal->vers.firstVer, pWal->vers.lastVer);
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }
  if (ver < pWal->vers.snapshotVer) {
  }

  if (walReadSeekVerImpl(pRead, ver) < 0) {
    return -1;
  }

L
Liu Jicong 已提交
217 218 219
  return 0;
}

L
Liu Jicong 已提交
220
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
221

L
Liu Jicong 已提交
222 223
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
  int64_t contLen;
L
Liu Jicong 已提交
224 225
  bool    seeked = false;

226 227
  wDebug("vgId:%d, wal starts to fetch head %d", pRead->pWal->cfg.vgId, fetchVer);

L
Liu Jicong 已提交
228
  if (pRead->curInvalid || pRead->curVersion != fetchVer) {
L
Liu Jicong 已提交
229 230
    if (walReadSeekVer(pRead, fetchVer) < 0) {
      ASSERT(0);
L
Liu Jicong 已提交
231 232
      pRead->curVersion = fetchVer;
      pRead->curInvalid = 1;
L
Liu Jicong 已提交
233 234
      return -1;
    }
L
Liu Jicong 已提交
235 236 237 238 239 240 241 242 243 244
    seeked = true;
  }
  while (1) {
    contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
      walReadSeekVerImpl(pRead, fetchVer);
      seeked = true;
      continue;
L
Liu Jicong 已提交
245
    } else {
L
Liu Jicong 已提交
246 247 248 249 250 251 252 253
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
      ASSERT(0);
      pRead->curInvalid = 1;
      return -1;
L
Liu Jicong 已提交
254 255
    }
  }
L
Liu Jicong 已提交
256
  pRead->curInvalid = 0;
L
Liu Jicong 已提交
257 258 259 260 261 262 263
  return 0;
}

static int32_t walFetchBodyNew(SWalReader *pRead) {
  SWalCont *pReadHead = &pRead->pHead->head;
  int64_t   ver = pReadHead->version;

264 265
  wDebug("vgId:%d, wal starts to fetch body %ld", pRead->pWal->cfg.vgId, ver);

L
Liu Jicong 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279
  if (pRead->capacity < pReadHead->bodyLen) {
    void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
    if (ptr == NULL) {
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
      return -1;
    }
    pRead->pHead = ptr;
    pReadHead = &pRead->pHead->head;
    pRead->capacity = pReadHead->bodyLen;
  }

  if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
    if (pReadHead->bodyLen < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
280
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
S
Shengliang Guan 已提交
281
             pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver, tstrerror(terrno));
L
Liu Jicong 已提交
282
    } else {
S
Shengliang Guan 已提交
283
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
S
Shengliang Guan 已提交
284
             pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver);
L
Liu Jicong 已提交
285 286
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    }
L
Liu Jicong 已提交
287
    pRead->curInvalid = 1;
L
Liu Jicong 已提交
288 289 290 291 292
    ASSERT(0);
    return -1;
  }

  if (pReadHead->version != ver) {
S
Shengliang Guan 已提交
293
    wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
S
Shengliang Guan 已提交
294
           pRead->pHead->head.version, ver);
L
Liu Jicong 已提交
295
    pRead->curInvalid = 1;
L
Liu Jicong 已提交
296 297 298 299 300 301
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    ASSERT(0);
    return -1;
  }

  if (walValidBodyCksum(pRead->pHead) != 0) {
S
Shengliang Guan 已提交
302
    wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
303
    pRead->curInvalid = 1;
L
Liu Jicong 已提交
304 305 306 307 308
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    ASSERT(0);
    return -1;
  }

309
  wDebug("version %ld is fetched, cursor advance", ver);
L
Liu Jicong 已提交
310 311 312 313 314 315 316 317
  pRead->curVersion = ver + 1;
  return 0;
}

static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
  int64_t code;

  ASSERT(pRead->curVersion == pRead->pHead->head.version);
L
Liu Jicong 已提交
318
  ASSERT(pRead->curInvalid == 0);
L
Liu Jicong 已提交
319 320 321 322

  code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
323
    pRead->curInvalid = 1;
L
Liu Jicong 已提交
324
    ASSERT(0);
L
Liu Jicong 已提交
325 326 327 328
    return -1;
  }

  pRead->curVersion++;
L
Liu Jicong 已提交
329
  wDebug("version advance to %ld, skip fetch", pRead->curVersion);
L
Liu Jicong 已提交
330 331 332 333

  return 0;
}

L
Liu Jicong 已提交
334
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
L
Liu Jicong 已提交
335
  int64_t code;
L
Liu Jicong 已提交
336

337
  // TODO: valid ver
L
Liu Jicong 已提交
338 339 340
  if (ver > pRead->pWal->vers.commitVer) {
    return -1;
  }
341

L
Liu Jicong 已提交
342
  if (pRead->curInvalid || pRead->curVersion != ver) {
343 344 345 346
    code = walReadSeekVer(pRead, ver);
    if (code < 0) return -1;
  }

L
Liu Jicong 已提交
347
  ASSERT(taosValidFile(pRead->pLogFile) == true);
348

L
Liu Jicong 已提交
349
  code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
L
Liu Jicong 已提交
350
  if (code != sizeof(SWalCkHead)) {
351 352 353 354 355 356
    return -1;
  }

  code = walValidHeadCksum(pHead);

  if (code != 0) {
L
Liu Jicong 已提交
357
    wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
358 359 360 361 362 363 364
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
365
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
L
Liu Jicong 已提交
366
  int64_t code;
367 368 369

  ASSERT(pRead->curVersion == pHead->head.version);

L
Liu Jicong 已提交
370
  code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
371 372
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
373
    pRead->curInvalid = 1;
374 375 376 377 378 379 380 381
    return -1;
  }

  pRead->curVersion++;

  return 0;
}

L
Liu Jicong 已提交
382
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
L
Liu Jicong 已提交
383 384
  SWalCont *pReadHead = &((*ppHead)->head);
  int64_t   ver = pReadHead->version;
385 386

  if (pRead->capacity < pReadHead->bodyLen) {
L
Liu Jicong 已提交
387
    void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
388 389 390 391 392
    if (ptr == NULL) {
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
      return -1;
    }
    *ppHead = ptr;
L
Liu Jicong 已提交
393
    pReadHead = &((*ppHead)->head);
394 395 396
    pRead->capacity = pReadHead->bodyLen;
  }

L
Liu Jicong 已提交
397
  if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
L
Liu Jicong 已提交
398
    ASSERT(0);
399 400 401 402
    return -1;
  }

  if (pReadHead->version != ver) {
S
Shengliang Guan 已提交
403
    wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
S
Shengliang Guan 已提交
404
           pRead->pHead->head.version, ver);
L
Liu Jicong 已提交
405
    pRead->curInvalid = 1;
406 407 408 409 410
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  if (walValidBodyCksum(*ppHead) != 0) {
S
Shengliang Guan 已提交
411
    wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
412
    pRead->curInvalid = 1;
413 414 415 416 417 418 419 420
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  pRead->curVersion = ver + 1;
  return 0;
}

L
Liu Jicong 已提交
421
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
L
Liu Jicong 已提交
422
  wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
423 424
  int64_t contLen;
  bool    seeked = false;
425 426 427 428 429 430

  if (pRead->pWal->vers.firstVer == -1) {
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

M
Minghao Li 已提交
431
  if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
432
    wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
L
Liu Jicong 已提交
433
           ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
M
Minghao Li 已提交
434 435 436 437
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

L
Liu Jicong 已提交
438 439
  if (pRead->curInvalid || pRead->curVersion != ver) {
    if (walReadSeekVer(pRead, ver) < 0) {
440
      wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
L
Liu Jicong 已提交
441 442 443 444
      return -1;
    }
    seeked = true;
  }
L
Liu Jicong 已提交
445

L
Liu Jicong 已提交
446 447 448 449 450 451 452 453 454 455 456 457 458 459
  while (1) {
    contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
      walReadSeekVerImpl(pRead, ver);
      seeked = true;
      continue;
    } else {
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
M
Minghao Li 已提交
460
      ASSERT(0);
L
Liu Jicong 已提交
461
      return -1;
M
Minghao Li 已提交
462
    }
L
Liu Jicong 已提交
463
  }
464

L
Liu Jicong 已提交
465 466
  contLen = walValidHeadCksum(pRead->pHead);
  if (contLen != 0) {
467 468
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
           ver);
L
Liu Jicong 已提交
469
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
470 471
    return -1;
  }
472

L
Liu Jicong 已提交
473
  if (pRead->capacity < pRead->pHead->head.bodyLen) {
L
Liu Jicong 已提交
474
    void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen);
L
Liu Jicong 已提交
475
    if (ptr == NULL) {
L
Liu Jicong 已提交
476
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
L
Liu Jicong 已提交
477 478
      return -1;
    }
L
Liu Jicong 已提交
479
    pRead->pHead = ptr;
L
Liu Jicong 已提交
480
    pRead->capacity = pRead->pHead->head.bodyLen;
L
Liu Jicong 已提交
481
  }
L
Liu Jicong 已提交
482

L
Liu Jicong 已提交
483
  if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
L
Liu Jicong 已提交
484
      pRead->pHead->head.bodyLen) {
L
Liu Jicong 已提交
485
    if (contLen < 0)
L
Liu Jicong 已提交
486
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minghao Li 已提交
487
    else {
L
Liu Jicong 已提交
488
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
M
Minghao Li 已提交
489 490
      ASSERT(0);
    }
L
Liu Jicong 已提交
491 492
    return -1;
  }
L
Liu Jicong 已提交
493

L
Liu Jicong 已提交
494
  if (pRead->pHead->head.version != ver) {
495
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
S
Shengliang Guan 已提交
496
           pRead->pHead->head.version, ver);
L
Liu Jicong 已提交
497
    pRead->curInvalid = 1;
L
Liu Jicong 已提交
498
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
499
    ASSERT(0);
L
Liu Jicong 已提交
500 501
    return -1;
  }
L
Liu Jicong 已提交
502

L
Liu Jicong 已提交
503 504
  contLen = walValidBodyCksum(pRead->pHead);
  if (contLen != 0) {
505 506
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
           ver);
L
Liu Jicong 已提交
507
    pRead->curInvalid = 1;
L
Liu Jicong 已提交
508
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
509
    ASSERT(0);
L
Liu Jicong 已提交
510 511
    return -1;
  }
L
Liu Jicong 已提交
512
  pRead->curVersion++;
L
Liu Jicong 已提交
513 514 515

  return 0;
}