walRead.c 18.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "taoserror.h"
L
Liu Jicong 已提交
17
#include "walInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20 21 22
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);

L
Liu Jicong 已提交
23
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
24 25
  SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
  if (pReader == NULL) {
26
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
27 28
    return NULL;
  }
29

30 31 32 33 34 35 36 37
  pReader->pWal = pWal;
  pReader->readerId = tGenIdPI64();
  pReader->pIdxFile = NULL;
  pReader->pLogFile = NULL;
  pReader->curVersion = -1;
  pReader->curFileFirstVer = -1;
  pReader->curInvalid = 1;
  pReader->capacity = 0;
L
Liu Jicong 已提交
38
  if (cond) {
39
    pReader->cond = *cond;
L
Liu Jicong 已提交
40
  } else {
41 42 43 44
    pReader->cond.scanUncommited = 0;
    pReader->cond.scanNotApplied = 0;
    pReader->cond.scanMeta = 0;
    pReader->cond.enableRef = 0;
L
Liu Jicong 已提交
45
  }
L
fix  
Liu Jicong 已提交
46

47
  taosThreadMutexInit(&pReader->mutex, NULL);
L
fix  
Liu Jicong 已提交
48

49 50
  pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
  if (pReader->pHead == NULL) {
S
Shengliang Guan 已提交
51
    terrno = TSDB_CODE_OUT_OF_MEMORY;
52
    taosMemoryFree(pReader);
L
Liu Jicong 已提交
53 54
    return NULL;
  }
L
Liu Jicong 已提交
55

56
  /*if (pReader->cond.enableRef) {*/
L
Liu Jicong 已提交
57
  /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
58 59 60
  /*}*/

  return pReader;
L
Liu Jicong 已提交
61 62
}

63 64 65 66 67 68 69 70
void walCloseReader(SWalReader *pReader) {
  taosCloseFile(&pReader->pIdxFile);
  taosCloseFile(&pReader->pLogFile);
  /*if (pReader->cond.enableRef) {*/
  /*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/
  /*}*/
  taosMemoryFreeClear(pReader->pHead);
  taosMemoryFree(pReader);
L
Liu Jicong 已提交
71 72
}

73 74 75 76 77 78
int32_t walNextValidMsg(SWalReader *pReader) {
  int64_t fetchVer = pReader->curVersion;
  int64_t lastVer = walGetLastVer(pReader->pWal);
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
  int64_t appliedVer = walGetAppliedVer(pReader->pWal);
  int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
L
Liu Jicong 已提交
79 80
  endVer = TMIN(appliedVer, endVer);

S
Shengliang Guan 已提交
81 82
  wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
         ", applied index:%" PRId64 ", end index:%" PRId64,
83 84
         pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
  pReader->curStopped = 0;
L
Liu Jicong 已提交
85
  while (fetchVer <= endVer) {
86
    if (walFetchHeadNew(pReader, fetchVer) < 0) {
L
Liu Jicong 已提交
87 88
      return -1;
    }
89 90 91
    if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT ||
        (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
      if (walFetchBodyNew(pReader) < 0) {
L
Liu Jicong 已提交
92 93 94 95
        return -1;
      }
      return 0;
    } else {
96
      if (walSkipFetchBodyNew(pReader) < 0) {
L
Liu Jicong 已提交
97 98 99
        return -1;
      }
      fetchVer++;
100
      ASSERT(fetchVer == pReader->curVersion);
L
Liu Jicong 已提交
101 102
    }
  }
103
  pReader->curStopped = 1;
L
Liu Jicong 已提交
104
  return -1;
L
Liu Jicong 已提交
105
}
L
Liu Jicong 已提交
106

107
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
L
Liu Jicong 已提交
108
  int64_t ret = 0;
L
Liu Jicong 已提交
109

110 111
  TdFilePtr pIdxTFile = pReader->pIdxFile;
  TdFilePtr pLogTFile = pReader->pLogFile;
L
Liu Jicong 已提交
112 113

  // seek position
L
Liu Jicong 已提交
114
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
L
Liu Jicong 已提交
115 116
  ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
117
    terrno = TAOS_SYSTEM_ERROR(errno);
118 119
    wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
           ver, offset, terrstr());
L
Liu Jicong 已提交
120 121
    return -1;
  }
L
Liu Jicong 已提交
122
  SWalIdxEntry entry = {0};
L
Liu Jicong 已提交
123 124 125
  if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
126
      wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
L
Liu Jicong 已提交
127 128
    } else {
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
129
      wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %ld",
130
             pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
L
Liu Jicong 已提交
131
    }
L
Liu Jicong 已提交
132 133
    return -1;
  }
L
Liu Jicong 已提交
134

135
  ASSERT(entry.ver == ver);
L
Liu Jicong 已提交
136 137
  ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
138
    terrno = TAOS_SYSTEM_ERROR(errno);
139 140
    wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
           ver, entry.offset, terrstr());
L
Liu Jicong 已提交
141 142
    return -1;
  }
L
Liu Jicong 已提交
143
  return ret;
L
Liu Jicong 已提交
144 145
}

146
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
L
Liu Jicong 已提交
147 148
  char fnameStr[WAL_FILE_LEN];

149 150
  taosCloseFile(&pReader->pIdxFile);
  taosCloseFile(&pReader->pLogFile);
L
Liu Jicong 已提交
151

152 153 154
  walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
  TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pLogFile == NULL) {
L
Liu Jicong 已提交
155
    terrno = TAOS_SYSTEM_ERROR(errno);
156
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
L
Liu Jicong 已提交
157 158 159
    return -1;
  }

160
  pReader->pLogFile = pLogFile;
L
Liu Jicong 已提交
161

162 163 164
  walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
  TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pIdxFile == NULL) {
L
Liu Jicong 已提交
165
    terrno = TAOS_SYSTEM_ERROR(errno);
166
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
L
Liu Jicong 已提交
167 168 169
    return -1;
  }

170
  pReader->pIdxFile = pIdxFile;
171 172 173

  pReader->curFileFirstVer = fileFirstVer;

L
Liu Jicong 已提交
174 175 176
  return 0;
}

177 178
int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
  SWal *pWal = pReader->pWal;
L
Liu Jicong 已提交
179

180
  // bsearch in fileSet
L
Liu Jicong 已提交
181
  SWalFileInfo tmpInfo;
L
Liu Jicong 已提交
182
  tmpInfo.firstVer = ver;
L
Liu Jicong 已提交
183
  SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
184
  if (pRet == NULL) {
185
    wError("failed to find WAL log file with ver:%" PRId64, ver);
186 187 188
    terrno = TSDB_CODE_WAL_INVALID_VER;
    return -1;
  }
189
  if (pReader->curFileFirstVer != pRet->firstVer) {
L
Liu Jicong 已提交
190
    // error code was set inner
191
    if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
L
Liu Jicong 已提交
192 193 194 195
      return -1;
    }
  }

L
Liu Jicong 已提交
196
  // error code was set inner
197
  if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) {
L
Liu Jicong 已提交
198 199
    return -1;
  }
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201
  wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid:%d) to %" PRId64, pReader->pWal->cfg.vgId,
S
Shengliang Guan 已提交
202
         pReader->curVersion, pReader->curInvalid, ver);
L
Liu Jicong 已提交
203

204
  pReader->curVersion = ver;
L
Liu Jicong 已提交
205 206 207
  return 0;
}

208 209 210
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
  SWal *pWal = pReader->pWal;
  if (!pReader->curInvalid && ver == pReader->curVersion) {
S
Shengliang Guan 已提交
211
    wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
212 213 214
    return 0;
  }

215 216
  pReader->curInvalid = 1;
  pReader->curVersion = ver;
L
Liu Jicong 已提交
217

L
Liu Jicong 已提交
218
  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
219
    wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
L
Liu Jicong 已提交
220 221 222 223 224 225 226
           ver, pWal->vers.firstVer, pWal->vers.lastVer);
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }
  if (ver < pWal->vers.snapshotVer) {
  }

227
  if (walReadSeekVerImpl(pReader, ver) < 0) {
L
Liu Jicong 已提交
228 229 230
    return -1;
  }

L
Liu Jicong 已提交
231 232 233
  return 0;
}

L
Liu Jicong 已提交
234
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
235

L
Liu Jicong 已提交
236 237
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
  int64_t contLen;
L
Liu Jicong 已提交
238 239
  bool    seeked = false;

S
Shengliang Guan 已提交
240
  wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
241

L
Liu Jicong 已提交
242
  if (pRead->curInvalid || pRead->curVersion != fetchVer) {
L
Liu Jicong 已提交
243
    if (walReadSeekVer(pRead, fetchVer) < 0) {
244
      ASSERT(0);
L
Liu Jicong 已提交
245 246
      pRead->curVersion = fetchVer;
      pRead->curInvalid = 1;
L
Liu Jicong 已提交
247 248
      return -1;
    }
L
Liu Jicong 已提交
249 250 251 252 253 254 255 256 257 258
    seeked = true;
  }
  while (1) {
    contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
      walReadSeekVerImpl(pRead, fetchVer);
      seeked = true;
      continue;
L
Liu Jicong 已提交
259
    } else {
L
Liu Jicong 已提交
260 261 262 263 264
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
265
      ASSERT(0);
L
Liu Jicong 已提交
266 267
      pRead->curInvalid = 1;
      return -1;
L
Liu Jicong 已提交
268 269
    }
  }
L
Liu Jicong 已提交
270
  pRead->curInvalid = 0;
L
Liu Jicong 已提交
271 272 273 274 275 276 277
  return 0;
}

static int32_t walFetchBodyNew(SWalReader *pRead) {
  SWalCont *pReadHead = &pRead->pHead->head;
  int64_t   ver = pReadHead->version;

S
Shengliang Guan 已提交
278
  wDebug("vgId:%d, wal starts to fetch body, index:%" PRId64, pRead->pWal->cfg.vgId, ver);
279

L
Liu Jicong 已提交
280
  if (pRead->capacity < pReadHead->bodyLen) {
L
Liu Jicong 已提交
281
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
L
Liu Jicong 已提交
282
    if (ptr == NULL) {
S
Shengliang Guan 已提交
283
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
284 285
      return -1;
    }
L
Liu Jicong 已提交
286
    pRead->pHead = ptr;
L
Liu Jicong 已提交
287 288 289 290 291 292 293
    pReadHead = &pRead->pHead->head;
    pRead->capacity = pReadHead->bodyLen;
  }

  if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
    if (pReadHead->bodyLen < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
294
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
S
Shengliang Guan 已提交
295
             pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver, tstrerror(terrno));
L
Liu Jicong 已提交
296
    } else {
S
Shengliang Guan 已提交
297
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
S
Shengliang Guan 已提交
298
             pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver);
L
Liu Jicong 已提交
299 300
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    }
L
Liu Jicong 已提交
301
    pRead->curInvalid = 1;
302
    ASSERT(0);
L
Liu Jicong 已提交
303 304 305 306
    return -1;
  }

  if (pReadHead->version != ver) {
S
Shengliang Guan 已提交
307
    wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
S
Shengliang Guan 已提交
308
           pRead->pHead->head.version, ver);
L
Liu Jicong 已提交
309
    pRead->curInvalid = 1;
L
Liu Jicong 已提交
310
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
311
    ASSERT(0);
L
Liu Jicong 已提交
312 313 314 315
    return -1;
  }

  if (walValidBodyCksum(pRead->pHead) != 0) {
S
Shengliang Guan 已提交
316
    wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
317
    pRead->curInvalid = 1;
L
Liu Jicong 已提交
318
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
319
    ASSERT(0);
L
Liu Jicong 已提交
320 321 322
    return -1;
  }

S
Shengliang Guan 已提交
323
  wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pRead->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
324 325 326 327 328 329 330
  pRead->curVersion = ver + 1;
  return 0;
}

static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
  int64_t code;

331 332
  ASSERT(pRead->curVersion == pRead->pHead->head.version);
  ASSERT(pRead->curInvalid == 0);
L
Liu Jicong 已提交
333 334 335 336

  code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
337
    pRead->curInvalid = 1;
338
    ASSERT(0);
L
Liu Jicong 已提交
339 340 341 342
    return -1;
  }

  pRead->curVersion++;
S
Shengliang Guan 已提交
343
  wDebug("vgId:%d, version advance to %" PRId64 ", skip fetch", pRead->pWal->cfg.vgId, pRead->curVersion);
L
Liu Jicong 已提交
344 345 346 347

  return 0;
}

L
Liu Jicong 已提交
348
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
L
Liu Jicong 已提交
349
  int64_t code;
350 351 352
  int64_t contLen;
  bool    seeked = false;

L
Liu Jicong 已提交
353 354 355 356
  wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
         ", applied ver:%" PRId64,
         pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
         pRead->pWal->vers.appliedVer);
L
Liu Jicong 已提交
357

358
  // TODO: valid ver
359
  if (ver > pRead->pWal->vers.appliedVer) {
L
Liu Jicong 已提交
360 361
    return -1;
  }
362

L
Liu Jicong 已提交
363
  if (pRead->curInvalid || pRead->curVersion != ver) {
364
    code = walReadSeekVer(pRead, ver);
365 366 367 368 369 370
    if (code < 0) {
      pRead->curVersion = ver;
      pRead->curInvalid = 1;
      return -1;
    }
    seeked = true;
371 372
  }

373 374 375 376 377 378 379 380 381 382 383 384 385 386
  while (1) {
    contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
      walReadSeekVerImpl(pRead, ver);
      seeked = true;
      continue;
    } else {
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
387
      ASSERT(0);
388 389 390
      pRead->curInvalid = 1;
      return -1;
    }
391 392 393 394 395
  }

  code = walValidHeadCksum(pHead);

  if (code != 0) {
L
Liu Jicong 已提交
396
    wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
397 398 399 400
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

401
  pRead->curInvalid = 0;
402 403 404
  return 0;
}

L
Liu Jicong 已提交
405
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
L
Liu Jicong 已提交
406
  int64_t code;
407

L
Liu Jicong 已提交
408 409 410 411 412
  wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
         ", applied ver:%" PRId64,
         pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
         pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);

413 414
  ASSERT(pRead->curVersion == pHead->head.version);
  ASSERT(pRead->curInvalid == 0);
415

L
Liu Jicong 已提交
416
  code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
417 418
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
419
    pRead->curInvalid = 1;
420 421 422 423 424 425 426 427
    return -1;
  }

  pRead->curVersion++;

  return 0;
}

L
Liu Jicong 已提交
428
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
L
Liu Jicong 已提交
429 430
  SWalCont *pReadHead = &((*ppHead)->head);
  int64_t   ver = pReadHead->version;
431

L
Liu Jicong 已提交
432 433 434 435 436
  wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
         ", applied ver:%" PRId64,
         pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
         pRead->pWal->vers.appliedVer);

437
  if (pRead->capacity < pReadHead->bodyLen) {
L
Liu Jicong 已提交
438
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
439
    if (ptr == NULL) {
S
Shengliang Guan 已提交
440
      terrno = TSDB_CODE_OUT_OF_MEMORY;
441 442
      return -1;
    }
L
Liu Jicong 已提交
443
    *ppHead = ptr;
L
Liu Jicong 已提交
444
    pReadHead = &((*ppHead)->head);
445 446 447
    pRead->capacity = pReadHead->bodyLen;
  }

L
Liu Jicong 已提交
448
  if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
449
    if (pReadHead->bodyLen < 0) {
450
      ASSERT(0);
451 452 453 454 455 456 457 458 459
      terrno = TAOS_SYSTEM_ERROR(errno);
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
             pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
    } else {
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
             pRead->pWal->cfg.vgId, pReadHead->version, ver);
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    }
    pRead->curInvalid = 1;
460
    ASSERT(0);
461 462 463 464
    return -1;
  }

  if (pReadHead->version != ver) {
465
    ASSERT(0);
S
Shengliang Guan 已提交
466
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
467
           pReadHead->version, ver);
L
Liu Jicong 已提交
468
    pRead->curInvalid = 1;
469 470 471 472 473
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  if (walValidBodyCksum(*ppHead) != 0) {
474
    ASSERT(0);
475 476
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
           ver);
L
Liu Jicong 已提交
477
    pRead->curInvalid = 1;
478 479 480 481 482 483 484 485
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  pRead->curVersion = ver + 1;
  return 0;
}

486
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
S
Shengliang Guan 已提交
487
  wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
488
  int64_t contLen;
489
  int32_t code;
L
Liu Jicong 已提交
490
  bool    seeked = false;
491

492
  if (walIsEmpty(pReader->pWal)) {
493 494 495 496
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

497 498 499
  if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
    wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
           ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
M
Minghao Li 已提交
500 501 502 503
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

504 505
  taosThreadMutexLock(&pReader->mutex);

506 507 508
  if (pReader->curInvalid || pReader->curVersion != ver) {
    if (walReadSeekVer(pReader, ver) < 0) {
      wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
509
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
510 511 512 513
      return -1;
    }
    seeked = true;
  }
L
Liu Jicong 已提交
514

L
Liu Jicong 已提交
515
  while (1) {
516
    contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
L
Liu Jicong 已提交
517 518 519
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
520
      walReadSeekVerImpl(pReader, ver);
L
Liu Jicong 已提交
521 522 523 524 525 526 527 528
      seeked = true;
      continue;
    } else {
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
529 530
      wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
             pReader->pWal->cfg.vgId, ver, terrstr());
531
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
532
      return -1;
M
Minghao Li 已提交
533
    }
L
Liu Jicong 已提交
534
  }
535

536 537 538
  code = walValidHeadCksum(pReader->pHead);
  if (code != 0) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
539
           ver);
L
Liu Jicong 已提交
540
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
541
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
542 543
    return -1;
  }
544

545
  if (pReader->capacity < pReader->pHead->head.bodyLen) {
L
Liu Jicong 已提交
546 547
    SWalCkHead *ptr =
        (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
L
Liu Jicong 已提交
548
    if (ptr == NULL) {
S
Shengliang Guan 已提交
549
      terrno = TSDB_CODE_OUT_OF_MEMORY;
550
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
551 552
      return -1;
    }
L
Liu Jicong 已提交
553
    pReader->pHead = ptr;
554
    pReader->capacity = pReader->pHead->head.bodyLen;
L
Liu Jicong 已提交
555
  }
L
Liu Jicong 已提交
556

557 558
  if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) !=
      pReader->pHead->head.bodyLen) {
L
Liu Jicong 已提交
559
    if (contLen < 0)
L
Liu Jicong 已提交
560
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minghao Li 已提交
561
    else {
L
Liu Jicong 已提交
562
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
M
Minghao Li 已提交
563
    }
564 565
    wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
           pReader->pWal->cfg.vgId, ver, terrstr());
566
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
567 568
    return -1;
  }
L
Liu Jicong 已提交
569

570 571 572 573
  if (pReader->pHead->head.version != ver) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
           pReader->pHead->head.version, ver);
    pReader->curInvalid = 1;
L
Liu Jicong 已提交
574
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
575
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
576 577
    return -1;
  }
L
Liu Jicong 已提交
578

579 580 581
  code = walValidBodyCksum(pReader->pHead);
  if (code != 0) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
582
           ver);
583 584
    uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
    uint32_t logCkSum = pReader->pHead->cksumBody;
S
Shengliang Guan 已提交
585
    wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
586
    pReader->curInvalid = 1;
L
Liu Jicong 已提交
587
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
588
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
589 590
    return -1;
  }
591
  pReader->curVersion++;
L
Liu Jicong 已提交
592

593 594
  taosThreadMutexUnlock(&pReader->mutex);

L
Liu Jicong 已提交
595 596
  return 0;
}