walRead.c 19.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "taoserror.h"
L
Liu Jicong 已提交
17
#include "walInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20 21 22
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);

L
Liu Jicong 已提交
23
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
24 25
  SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
  if (pReader == NULL) {
26
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
27 28
    return NULL;
  }
29

30 31 32 33 34 35 36
  pReader->pWal = pWal;
  pReader->readerId = tGenIdPI64();
  pReader->pIdxFile = NULL;
  pReader->pLogFile = NULL;
  pReader->curVersion = -1;
  pReader->curFileFirstVer = -1;
  pReader->capacity = 0;
L
Liu Jicong 已提交
37
  if (cond) {
38
    pReader->cond = *cond;
L
Liu Jicong 已提交
39
  } else {
40
//    pReader->cond.scanUncommited = 0;
41 42 43
    pReader->cond.scanNotApplied = 0;
    pReader->cond.scanMeta = 0;
    pReader->cond.enableRef = 0;
L
Liu Jicong 已提交
44
  }
L
fix  
Liu Jicong 已提交
45

46
  taosThreadMutexInit(&pReader->mutex, NULL);
L
fix  
Liu Jicong 已提交
47

48 49
  pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
  if (pReader->pHead == NULL) {
S
Shengliang Guan 已提交
50
    terrno = TSDB_CODE_OUT_OF_MEMORY;
51
    taosMemoryFree(pReader);
L
Liu Jicong 已提交
52 53
    return NULL;
  }
L
Liu Jicong 已提交
54

55
  /*if (pReader->cond.enableRef) {*/
L
Liu Jicong 已提交
56
  /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
57 58 59
  /*}*/

  return pReader;
L
Liu Jicong 已提交
60 61
}

62 63 64 65 66
void walCloseReader(SWalReader *pReader) {
  taosCloseFile(&pReader->pIdxFile);
  taosCloseFile(&pReader->pLogFile);
  taosMemoryFreeClear(pReader->pHead);
  taosMemoryFree(pReader);
L
Liu Jicong 已提交
67 68
}

69 70 71 72
int32_t walNextValidMsg(SWalReader *pReader) {
  int64_t fetchVer = pReader->curVersion;
  int64_t lastVer = walGetLastVer(pReader->pWal);
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
73
//  int64_t appliedVer = walGetAppliedVer(pReader->pWal);
74

75 76 77
//  if(appliedVer < committedVer){   // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
//    wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
//  }
78

79 80
//  int64_t endVer = TMIN(appliedVer, committedVer);
  int64_t endVer = committedVer;
L
Liu Jicong 已提交
81

S
Shengliang Guan 已提交
82
  wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
83 84
         ", end index:%" PRId64,
         pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer);
85

86 87 88 89 90
  if (fetchVer > endVer){
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

91
  while (fetchVer <= endVer) {
92
    if (walFetchHeadNew(pReader, fetchVer) < 0) {
L
Liu Jicong 已提交
93 94
      return -1;
    }
95

96 97 98
    int32_t type = pReader->pHead->head.msgType;
    if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
        (IS_META_MSG(type) && pReader->cond.scanMeta)) {
99
      if (walFetchBodyNew(pReader) < 0) {
L
Liu Jicong 已提交
100 101 102 103
        return -1;
      }
      return 0;
    } else {
104
      if (walSkipFetchBodyNew(pReader) < 0) {
L
Liu Jicong 已提交
105 106
        return -1;
      }
107

wmmhello's avatar
wmmhello 已提交
108
      fetchVer = pReader->curVersion;
L
Liu Jicong 已提交
109 110
    }
  }
111

L
Liu Jicong 已提交
112
  return -1;
L
Liu Jicong 已提交
113
}
L
Liu Jicong 已提交
114

115
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
116
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
117 118 119 120 121 122 123 124
void    walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver) { atomic_store_64(&pReader->skipToVersion, ver); }

// this function is NOT multi-thread safe, and no need to be.
int64_t walReaderGetSkipToVersion(SWalReader *pReader) {
  int64_t newVersion = pReader->skipToVersion;
  pReader->skipToVersion = 0;
  return newVersion;
}
125

126 127 128 129 130 131 132
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) {
  *sver = walGetFirstVer(pReader->pWal);
  int64_t lastVer = walGetLastVer(pReader->pWal);
  int64_t committedVer = walGetCommittedVer(pReader->pWal);
  *ever = pReader->cond.scanUncommited ? lastVer : committedVer;
}

133 134 135 136 137 138
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){
  // if offset version is small than first version , let's seek to first version
  taosThreadMutexLock(&pWalReader->pWal->mutex);
  int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
  taosThreadMutexUnlock(&pWalReader->pWal->mutex);

139 140
  if (pOffset->version < firstVer){
    pOffset->version = firstVer;
141 142 143
  }
}

144
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
L
Liu Jicong 已提交
145
  int64_t ret = 0;
L
Liu Jicong 已提交
146

147 148
  TdFilePtr pIdxTFile = pReader->pIdxFile;
  TdFilePtr pLogTFile = pReader->pLogFile;
L
Liu Jicong 已提交
149 150

  // seek position
L
Liu Jicong 已提交
151
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
L
Liu Jicong 已提交
152 153
  ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
154
    terrno = TAOS_SYSTEM_ERROR(errno);
155 156
    wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
           ver, offset, terrstr());
L
Liu Jicong 已提交
157 158
    return -1;
  }
L
Liu Jicong 已提交
159
  SWalIdxEntry entry = {0};
L
Liu Jicong 已提交
160 161 162
  if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
163
      wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr());
L
Liu Jicong 已提交
164 165
    } else {
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
166
      wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %ld",
167
             pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry));
L
Liu Jicong 已提交
168
    }
L
Liu Jicong 已提交
169 170
    return -1;
  }
L
Liu Jicong 已提交
171 172 173

  ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
174
    terrno = TAOS_SYSTEM_ERROR(errno);
175 176
    wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId,
           ver, entry.offset, terrstr());
L
Liu Jicong 已提交
177 178
    return -1;
  }
L
Liu Jicong 已提交
179
  return ret;
L
Liu Jicong 已提交
180 181
}

182
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
wmmhello's avatar
wmmhello 已提交
183
  char fnameStr[WAL_FILE_LEN] = {0};
L
Liu Jicong 已提交
184

185 186
  taosCloseFile(&pReader->pIdxFile);
  taosCloseFile(&pReader->pLogFile);
L
Liu Jicong 已提交
187

188 189 190
  walBuildLogName(pReader->pWal, fileFirstVer, fnameStr);
  TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pLogFile == NULL) {
L
Liu Jicong 已提交
191
    terrno = TAOS_SYSTEM_ERROR(errno);
192
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
L
Liu Jicong 已提交
193 194 195
    return -1;
  }

196
  pReader->pLogFile = pLogFile;
L
Liu Jicong 已提交
197

198 199 200
  walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr);
  TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pIdxFile == NULL) {
L
Liu Jicong 已提交
201
    terrno = TAOS_SYSTEM_ERROR(errno);
202
    wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr());
L
Liu Jicong 已提交
203 204 205
    return -1;
  }

206
  pReader->pIdxFile = pIdxFile;
207 208 209

  pReader->curFileFirstVer = fileFirstVer;

L
Liu Jicong 已提交
210 211 212
  return 0;
}

213 214
int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
  SWal *pWal = pReader->pWal;
L
Liu Jicong 已提交
215

216
  // bsearch in fileSet
L
Liu Jicong 已提交
217
  SWalFileInfo tmpInfo;
L
Liu Jicong 已提交
218
  tmpInfo.firstVer = ver;
L
Liu Jicong 已提交
219
  SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
220
  if (pRet == NULL) {
221
    wError("failed to find WAL log file with ver:%" PRId64, ver);
222 223 224
    terrno = TSDB_CODE_WAL_INVALID_VER;
    return -1;
  }
225

226
  if (pReader->curFileFirstVer != pRet->firstVer) {
L
Liu Jicong 已提交
227
    // error code was set inner
228
    if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
L
Liu Jicong 已提交
229 230 231 232
      return -1;
    }
  }

L
Liu Jicong 已提交
233
  // error code was set inner
234
  if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) {
L
Liu Jicong 已提交
235 236
    return -1;
  }
L
Liu Jicong 已提交
237

238 239
  wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId,
         pReader->curVersion, ver);
L
Liu Jicong 已提交
240

241
  pReader->curVersion = ver;
L
Liu Jicong 已提交
242 243 244
  return 0;
}

245
int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
246
  SWal *pWal = pReader->pWal;
247
  if (ver == pReader->curVersion) {
S
Shengliang Guan 已提交
248
    wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
249 250 251 252
    return 0;
  }

  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
253
    wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
L
Liu Jicong 已提交
254 255 256 257 258
           ver, pWal->vers.firstVer, pWal->vers.lastVer);
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

259
  if (walReadSeekVerImpl(pReader, ver) < 0) {
L
Liu Jicong 已提交
260 261 262
    return -1;
  }

L
Liu Jicong 已提交
263 264 265
  return 0;
}

L
Liu Jicong 已提交
266
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
267

L
Liu Jicong 已提交
268 269
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
  int64_t contLen;
L
Liu Jicong 已提交
270 271
  bool    seeked = false;

S
Shengliang Guan 已提交
272
  wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
273

274
  if (pRead->curVersion != fetchVer) {
275
    if (walReaderSeekVer(pRead, fetchVer) < 0) {
L
Liu Jicong 已提交
276 277
      return -1;
    }
L
Liu Jicong 已提交
278 279
    seeked = true;
  }
280

L
Liu Jicong 已提交
281 282 283 284 285
  while (1) {
    contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
286 287 288
      if(walReadSeekVerImpl(pRead, fetchVer) < 0){
        return -1;
      }
L
Liu Jicong 已提交
289 290
      seeked = true;
      continue;
L
Liu Jicong 已提交
291
    } else {
L
Liu Jicong 已提交
292 293 294 295 296 297
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
      return -1;
L
Liu Jicong 已提交
298 299
    }
  }
300
//  pRead->curInvalid = 0;
L
Liu Jicong 已提交
301 302 303
  return 0;
}

L
Liu Jicong 已提交
304 305
static int32_t walFetchBodyNew(SWalReader *pReader) {
  SWalCont *pReadHead = &pReader->pHead->head;
L
Liu Jicong 已提交
306 307
  int64_t   ver = pReadHead->version;

308
  wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver,
L
Liu Jicong 已提交
309
         pReadHead->bodyLen);
310

L
Liu Jicong 已提交
311 312
  if (pReader->capacity < pReadHead->bodyLen) {
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
L
Liu Jicong 已提交
313
    if (ptr == NULL) {
S
Shengliang Guan 已提交
314
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
315 316
      return -1;
    }
317

L
Liu Jicong 已提交
318 319 320
    pReader->pHead = ptr;
    pReadHead = &pReader->pHead->head;
    pReader->capacity = pReadHead->bodyLen;
L
Liu Jicong 已提交
321 322
  }

L
Liu Jicong 已提交
323
  if (pReadHead->bodyLen != taosReadFile(pReader->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
L
Liu Jicong 已提交
324 325
    if (pReadHead->bodyLen < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
326
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
L
Liu Jicong 已提交
327
             pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver, tstrerror(terrno));
L
Liu Jicong 已提交
328
    } else {
S
Shengliang Guan 已提交
329
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
L
Liu Jicong 已提交
330
             pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
L
Liu Jicong 已提交
331 332 333 334 335
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    }
    return -1;
  }

L
Liu Jicong 已提交
336 337
  if (walValidBodyCksum(pReader->pHead) != 0) {
    wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
338 339 340 341
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

342
  wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType);
L
Liu Jicong 已提交
343
  pReader->curVersion = ver + 1;
L
Liu Jicong 已提交
344 345 346 347 348 349 350 351 352
  return 0;
}

static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
  int64_t code;

  code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
353
//    pRead->curInvalid = 1;
L
Liu Jicong 已提交
354 355 356 357
    return -1;
  }

  pRead->curVersion++;
S
Shengliang Guan 已提交
358
  wDebug("vgId:%d, version advance to %" PRId64 ", skip fetch", pRead->pWal->cfg.vgId, pRead->curVersion);
L
Liu Jicong 已提交
359 360 361 362

  return 0;
}

L
Liu Jicong 已提交
363
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
L
Liu Jicong 已提交
364
  int64_t code;
365 366 367
  int64_t contLen;
  bool    seeked = false;

S
Shengliang Guan 已提交
368
  wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
L
Liu Jicong 已提交
369 370 371
         ", applied ver:%" PRId64,
         pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
         pRead->pWal->vers.appliedVer);
L
Liu Jicong 已提交
372

373
  // TODO: valid ver
374 375 376
  if (ver > pRead->pWal->vers.commitVer) {
    return -1;
  }
377

378
  if (pRead->curVersion != ver) {
379
    code = walReaderSeekVer(pRead, ver);
380
    if (code < 0) {
381 382
//      pRead->curVersion = ver;
//      pRead->curInvalid = 1;
383 384 385
      return -1;
    }
    seeked = true;
386 387
  }

388 389 390 391 392
  while (1) {
    contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
393 394 395
      if(walReadSeekVerImpl(pRead, ver) < 0){
        return -1;
      }
396 397 398 399 400 401 402 403
      seeked = true;
      continue;
    } else {
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
404
//      pRead->curInvalid = 1;
405 406
      return -1;
    }
407 408 409 410 411
  }

  code = walValidHeadCksum(pHead);

  if (code != 0) {
L
Liu Jicong 已提交
412
    wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
413 414 415 416
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

417
//  pRead->curInvalid = 0;
418 419 420
  return 0;
}

L
Liu Jicong 已提交
421
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
L
Liu Jicong 已提交
422
  int64_t code;
423

S
Shengliang Guan 已提交
424
  wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
L
Liu Jicong 已提交
425 426 427 428
         ", applied ver:%" PRId64,
         pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
         pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);

L
Liu Jicong 已提交
429
  code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
430 431
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
432
//    pRead->curInvalid = 1;
433 434 435 436 437 438 439 440
    return -1;
  }

  pRead->curVersion++;

  return 0;
}

L
Liu Jicong 已提交
441
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
L
Liu Jicong 已提交
442 443
  SWalCont *pReadHead = &((*ppHead)->head);
  int64_t   ver = pReadHead->version;
444

S
Shengliang Guan 已提交
445
  wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
L
Liu Jicong 已提交
446 447 448 449
         ", applied ver:%" PRId64,
         pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
         pRead->pWal->vers.appliedVer);

450
  if (pRead->capacity < pReadHead->bodyLen) {
L
Liu Jicong 已提交
451
    SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
452
    if (ptr == NULL) {
S
Shengliang Guan 已提交
453
      terrno = TSDB_CODE_OUT_OF_MEMORY;
454 455
      return -1;
    }
L
Liu Jicong 已提交
456
    *ppHead = ptr;
L
Liu Jicong 已提交
457
    pReadHead = &((*ppHead)->head);
458 459 460
    pRead->capacity = pReadHead->bodyLen;
  }

L
Liu Jicong 已提交
461
  if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
462 463 464 465 466 467 468 469 470
    if (pReadHead->bodyLen < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
             pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
    } else {
      wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
             pRead->pWal->cfg.vgId, pReadHead->version, ver);
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    }
471
//    pRead->curInvalid = 1;
472 473 474 475
    return -1;
  }

  if (pReadHead->version != ver) {
S
Shengliang Guan 已提交
476
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
477
           pReadHead->version, ver);
478
//    pRead->curInvalid = 1;
479 480 481 482 483
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  if (walValidBodyCksum(*ppHead) != 0) {
484 485
    wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
           ver);
486
//    pRead->curInvalid = 1;
487 488 489 490 491 492 493 494
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  pRead->curVersion = ver + 1;
  return 0;
}

495
int32_t walReadVer(SWalReader *pReader, int64_t ver) {
S
Shengliang Guan 已提交
496
  wDebug("vgId:%d, wal start to read index:%" PRId64, pReader->pWal->cfg.vgId, ver);
L
Liu Jicong 已提交
497
  int64_t contLen;
498
  int32_t code;
L
Liu Jicong 已提交
499
  bool    seeked = false;
500

501
  if (walIsEmpty(pReader->pWal)) {
502 503 504 505
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

506 507 508
  if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
    wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
           ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
M
Minghao Li 已提交
509 510 511 512
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

513 514
  taosThreadMutexLock(&pReader->mutex);

515
  if (pReader->curVersion != ver) {
516
    if (walReaderSeekVer(pReader, ver) < 0) {
517
      wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
518
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
519 520 521 522
      return -1;
    }
    seeked = true;
  }
L
Liu Jicong 已提交
523

L
Liu Jicong 已提交
524
  while (1) {
525
    contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
L
Liu Jicong 已提交
526 527 528
    if (contLen == sizeof(SWalCkHead)) {
      break;
    } else if (contLen == 0 && !seeked) {
529 530 531 532
      if(walReadSeekVerImpl(pReader, ver) < 0){
        taosThreadMutexUnlock(&pReader->mutex);
        return -1;
      }
L
Liu Jicong 已提交
533 534 535 536 537 538 539 540
      seeked = true;
      continue;
    } else {
      if (contLen < 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
      } else {
        terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
      }
541 542
      wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
             pReader->pWal->cfg.vgId, ver, terrstr());
543
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
544
      return -1;
M
Minghao Li 已提交
545
    }
L
Liu Jicong 已提交
546
  }
547

548 549 550
  code = walValidHeadCksum(pReader->pHead);
  if (code != 0) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
551
           ver);
L
Liu Jicong 已提交
552
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
553
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
554 555
    return -1;
  }
556

557
  if (pReader->capacity < pReader->pHead->head.bodyLen) {
L
Liu Jicong 已提交
558 559
    SWalCkHead *ptr =
        (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
L
Liu Jicong 已提交
560
    if (ptr == NULL) {
S
Shengliang Guan 已提交
561
      terrno = TSDB_CODE_OUT_OF_MEMORY;
562
      taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
563 564
      return -1;
    }
L
Liu Jicong 已提交
565
    pReader->pHead = ptr;
566
    pReader->capacity = pReader->pHead->head.bodyLen;
L
Liu Jicong 已提交
567
  }
L
Liu Jicong 已提交
568

569 570
  if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) !=
      pReader->pHead->head.bodyLen) {
L
Liu Jicong 已提交
571
    if (contLen < 0)
L
Liu Jicong 已提交
572
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minghao Li 已提交
573
    else {
L
Liu Jicong 已提交
574
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
M
Minghao Li 已提交
575
    }
576 577
    wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
           pReader->pWal->cfg.vgId, ver, terrstr());
578
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
579 580
    return -1;
  }
L
Liu Jicong 已提交
581

582 583 584
  if (pReader->pHead->head.version != ver) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
           pReader->pHead->head.version, ver);
585
//    pReader->curInvalid = 1;
L
Liu Jicong 已提交
586
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
587
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
588 589
    return -1;
  }
L
Liu Jicong 已提交
590

591 592 593
  code = walValidBodyCksum(pReader->pHead);
  if (code != 0) {
    wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
594
           ver);
595 596
    uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
    uint32_t logCkSum = pReader->pHead->cksumBody;
S
Shengliang Guan 已提交
597
    wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
598
//    pReader->curInvalid = 1;
L
Liu Jicong 已提交
599
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
600
    taosThreadMutexUnlock(&pReader->mutex);
L
Liu Jicong 已提交
601 602
    return -1;
  }
603
  pReader->curVersion++;
L
Liu Jicong 已提交
604

605 606
  taosThreadMutexUnlock(&pReader->mutex);

L
Liu Jicong 已提交
607 608
  return 0;
}
609 610 611 612 613 614

void walReadReset(SWalReader *pReader) {
  taosThreadMutexLock(&pReader->mutex);
  taosCloseFile(&pReader->pIdxFile);
  taosCloseFile(&pReader->pLogFile);
  pReader->curFileFirstVer = -1;
615
  pReader->curVersion = -1;
616 617
  taosThreadMutexUnlock(&pReader->mutex);
}