walRead.c 12.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "taoserror.h"
L
Liu Jicong 已提交
17
#include "walInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20 21 22
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);

L
Liu Jicong 已提交
23 24
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
  SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
L
Liu Jicong 已提交
25
  if (pRead == NULL) {
26
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
27 28
    return NULL;
  }
29

L
Liu Jicong 已提交
30
  pRead->pWal = pWal;
L
Liu Jicong 已提交
31 32
  pRead->pIdxFile = NULL;
  pRead->pLogFile = NULL;
L
Liu Jicong 已提交
33 34 35
  pRead->curVersion = -1;
  pRead->curFileFirstVer = -1;
  pRead->capacity = 0;
L
Liu Jicong 已提交
36 37 38 39 40 41
  if (cond)
    pRead->cond = *cond;
  else {
    pRead->cond.scanMeta = 0;
    pRead->cond.scanUncommited = 0;
  }
L
fix  
Liu Jicong 已提交
42 43 44

  taosThreadMutexInit(&pRead->mutex, NULL);

L
Liu Jicong 已提交
45
  pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead));
L
Liu Jicong 已提交
46
  if (pRead->pHead == NULL) {
L
Liu Jicong 已提交
47
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
48
    taosMemoryFree(pRead);
L
Liu Jicong 已提交
49 50
    return NULL;
  }
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
  return pRead;
L
Liu Jicong 已提交
53 54
}

L
Liu Jicong 已提交
55 56 57
void walCloseReader(SWalReader *pRead) {
  taosCloseFile(&pRead->pIdxFile);
  taosCloseFile(&pRead->pLogFile);
wafwerar's avatar
wafwerar 已提交
58 59
  taosMemoryFreeClear(pRead->pHead);
  taosMemoryFree(pRead);
L
Liu Jicong 已提交
60 61
}

L
Liu Jicong 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
int32_t walNextValidMsg(SWalReader *pRead) {
  int64_t fetchVer = pRead->curVersion;
  int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
  while (fetchVer <= endVer) {
    if (walFetchHeadNew(pRead, fetchVer) < 0) {
      return -1;
    }
    if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT ||
        (IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) {
      if (walFetchBodyNew(pRead) < 0) {
        return -1;
      }
      return 0;
    } else {
      if (walSkipFetchBodyNew(pRead) < 0) {
        return -1;
      }
      fetchVer++;
      ASSERT(fetchVer == pRead->curVersion);
    }
  }
  return -1;
L
Liu Jicong 已提交
84
}
L
Liu Jicong 已提交
85

L
Liu Jicong 已提交
86
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
L
Liu Jicong 已提交
87
  int64_t ret = 0;
L
Liu Jicong 已提交
88

L
Liu Jicong 已提交
89 90
  TdFilePtr pIdxTFile = pRead->pIdxFile;
  TdFilePtr pLogTFile = pRead->pLogFile;
L
Liu Jicong 已提交
91 92

  // seek position
L
Liu Jicong 已提交
93
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
L
Liu Jicong 已提交
94 95
  ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
96
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
97
    wError("failed to seek idx file, ver %ld, pos: %ld, since %s", ver, offset, terrstr());
L
Liu Jicong 已提交
98 99
    return -1;
  }
L
Liu Jicong 已提交
100
  SWalIdxEntry entry = {0};
L
Liu Jicong 已提交
101 102 103 104 105 106
  if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      wError("failed to read idx file, since %s", terrstr());
    } else {
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
107
      wError("read idx file incompletely, read bytes %ld, bytes should be %lu", ret, sizeof(SWalIdxEntry));
L
Liu Jicong 已提交
108
    }
L
Liu Jicong 已提交
109 110
    return -1;
  }
L
Liu Jicong 已提交
111

L
Liu Jicong 已提交
112
  ASSERT(entry.ver == ver);
L
Liu Jicong 已提交
113 114
  ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
115
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
116
    wError("failed to seek log file, ver %ld, pos: %ld, since %s", ver, entry.offset, terrstr());
L
Liu Jicong 已提交
117 118
    return -1;
  }
L
Liu Jicong 已提交
119
  return ret;
L
Liu Jicong 已提交
120 121
}

L
Liu Jicong 已提交
122
static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
L
Liu Jicong 已提交
123 124
  char fnameStr[WAL_FILE_LEN];

L
Liu Jicong 已提交
125 126
  taosCloseFile(&pRead->pIdxFile);
  taosCloseFile(&pRead->pLogFile);
L
Liu Jicong 已提交
127 128

  walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
129 130
  TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pLogTFile == NULL) {
L
Liu Jicong 已提交
131
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
132
    wError("cannot open file %s, since %s", fnameStr, terrstr());
L
Liu Jicong 已提交
133 134 135
    return -1;
  }

L
Liu Jicong 已提交
136
  pRead->pLogFile = pLogTFile;
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138
  walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
139 140
  TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pIdxTFile == NULL) {
L
Liu Jicong 已提交
141
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
142
    wError("cannot open file %s, since %s", fnameStr, terrstr());
L
Liu Jicong 已提交
143 144 145
    return -1;
  }

L
Liu Jicong 已提交
146
  pRead->pIdxFile = pIdxTFile;
L
Liu Jicong 已提交
147 148 149
  return 0;
}

L
Liu Jicong 已提交
150
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
L
Liu Jicong 已提交
151
  SWal *pWal = pRead->pWal;
L
Liu Jicong 已提交
152
  if (ver == pRead->curVersion) {
L
Liu Jicong 已提交
153
    wDebug("wal version %ld match, no need to reset", ver);
L
Liu Jicong 已提交
154 155
    return 0;
  }
L
Liu Jicong 已提交
156
  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
L
Liu Jicong 已提交
157
    wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer);
L
Liu Jicong 已提交
158
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
L
Liu Jicong 已提交
159 160
    return -1;
  }
L
Liu Jicong 已提交
161
  if (ver < pWal->vers.snapshotVer) {
L
Liu Jicong 已提交
162 163
  }

L
Liu Jicong 已提交
164
  SWalFileInfo tmpInfo;
L
Liu Jicong 已提交
165
  tmpInfo.firstVer = ver;
L
Liu Jicong 已提交
166
  // bsearch in fileSet
L
Liu Jicong 已提交
167
  SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
L
Liu Jicong 已提交
168
  ASSERT(pRet != NULL);
L
Liu Jicong 已提交
169
  if (pRead->curFileFirstVer != pRet->firstVer) {
L
Liu Jicong 已提交
170
    // error code set inner
L
Liu Jicong 已提交
171
    if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
L
Liu Jicong 已提交
172 173 174 175
      return -1;
    }
  }

L
Liu Jicong 已提交
176
  // error code set inner
L
Liu Jicong 已提交
177
  if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
L
Liu Jicong 已提交
178 179
    return -1;
  }
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181 182
  wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);

L
Liu Jicong 已提交
183
  pRead->curVersion = ver;
L
Liu Jicong 已提交
184

L
Liu Jicong 已提交
185 186 187
  return 0;
}

L
Liu Jicong 已提交
188
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
189

L
Liu Jicong 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
  int64_t contLen;
  if (pRead->curVersion != fetchVer) {
    if (walReadSeekVer(pRead, fetchVer) < 0) return -1;
  }
  contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
  if (contLen != sizeof(SWalCkHead)) {
    if (contLen < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
    } else {
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    }
    pRead->curVersion = -1;
    return -1;
  }
  return 0;
}

static int32_t walFetchBodyNew(SWalReader *pRead) {
  SWalCont *pReadHead = &pRead->pHead->head;
  int64_t   ver = pReadHead->version;

  if (pRead->capacity < pReadHead->bodyLen) {
    void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
    if (ptr == NULL) {
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
      return -1;
    }
    pRead->pHead = ptr;
    pReadHead = &pRead->pHead->head;
    pRead->capacity = pReadHead->bodyLen;
  }

  if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
    if (pReadHead->bodyLen < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since %s",
             pRead->pHead->head.version, ver, tstrerror(terrno));
    } else {
      wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since file corrupted",
             pRead->pHead->head.version, ver);
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    }
    pRead->curVersion = -1;
    ASSERT(0);
    return -1;
  }

  if (pReadHead->version != ver) {
    wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    ASSERT(0);
    return -1;
  }

  if (walValidBodyCksum(pRead->pHead) != 0) {
    wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    ASSERT(0);
    return -1;
  }

  pRead->curVersion = ver + 1;
L
Liu Jicong 已提交
255
  wDebug("version advance to %ld, fetch body", pRead->curVersion);
L
Liu Jicong 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
  return 0;
}

static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
  int64_t code;

  ASSERT(pRead->curVersion == pRead->pHead->head.version);

  code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    pRead->curVersion = -1;
    return -1;
  }

  pRead->curVersion++;
L
Liu Jicong 已提交
272
  wDebug("version advance to %ld, skip fetch", pRead->curVersion);
L
Liu Jicong 已提交
273 274 275 276

  return 0;
}

L
Liu Jicong 已提交
277
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
L
Liu Jicong 已提交
278
  int64_t code;
L
Liu Jicong 已提交
279

280
  // TODO: valid ver
L
Liu Jicong 已提交
281 282 283
  if (ver > pRead->pWal->vers.commitVer) {
    return -1;
  }
284 285 286 287 288 289

  if (pRead->curVersion != ver) {
    code = walReadSeekVer(pRead, ver);
    if (code < 0) return -1;
  }

L
Liu Jicong 已提交
290
  ASSERT(taosValidFile(pRead->pLogFile) == true);
291

L
Liu Jicong 已提交
292
  code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
L
Liu Jicong 已提交
293
  if (code != sizeof(SWalCkHead)) {
294 295 296 297 298 299 300 301 302 303 304 305 306 307
    return -1;
  }

  code = walValidHeadCksum(pHead);

  if (code != 0) {
    wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
308
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
L
Liu Jicong 已提交
309
  int64_t code;
310 311 312

  ASSERT(pRead->curVersion == pHead->head.version);

L
Liu Jicong 已提交
313
  code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
314 315 316 317 318 319 320 321 322 323 324
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    pRead->curVersion = -1;
    return -1;
  }

  pRead->curVersion++;

  return 0;
}

L
Liu Jicong 已提交
325
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
L
Liu Jicong 已提交
326 327
  SWalCont *pReadHead = &((*ppHead)->head);
  int64_t   ver = pReadHead->version;
328 329

  if (pRead->capacity < pReadHead->bodyLen) {
L
Liu Jicong 已提交
330
    void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
331 332 333 334 335
    if (ptr == NULL) {
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
      return -1;
    }
    *ppHead = ptr;
L
Liu Jicong 已提交
336
    pReadHead = &((*ppHead)->head);
337 338 339
    pRead->capacity = pReadHead->bodyLen;
  }

L
Liu Jicong 已提交
340
  if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
L
Liu Jicong 已提交
341
    ASSERT(0);
342 343 344 345
    return -1;
  }

  if (pReadHead->version != ver) {
L
Liu Jicong 已提交
346
    wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
347 348 349 350 351 352
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  if (walValidBodyCksum(*ppHead) != 0) {
L
Liu Jicong 已提交
353
    wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
354 355 356 357 358 359 360 361 362
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  pRead->curVersion = ver + 1;
  return 0;
}

L
Liu Jicong 已提交
363
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
L
Liu Jicong 已提交
364
  int64_t code;
365 366 367 368 369 370

  if (pRead->pWal->vers.firstVer == -1) {
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

L
Liu Jicong 已提交
371 372
  // TODO: check wal life
  if (pRead->curVersion != ver) {
L
Liu Jicong 已提交
373
    if (walReadSeekVer(pRead, ver) < 0) {
L
Liu Jicong 已提交
374
      wError("unexpected wal log version: % " PRId64 ", since %s", ver, terrstr());
L
Liu Jicong 已提交
375 376
      return -1;
    }
L
Liu Jicong 已提交
377 378
  }

M
Minghao Li 已提交
379 380 381 382 383 384 385
  if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
    wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pRead->pWal->vers.firstVer,
           pRead->pWal->vers.lastVer);
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

L
Liu Jicong 已提交
386
  ASSERT(taosValidFile(pRead->pLogFile) == true);
L
Liu Jicong 已提交
387

L
Liu Jicong 已提交
388
  code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
L
Liu Jicong 已提交
389
  if (code != sizeof(SWalCkHead)) {
L
Liu Jicong 已提交
390 391
    if (code < 0)
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minghao Li 已提交
392
    else {
L
Liu Jicong 已提交
393
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
M
Minghao Li 已提交
394 395
      ASSERT(0);
    }
L
Liu Jicong 已提交
396 397
    return -1;
  }
398

L
Liu Jicong 已提交
399
  code = walValidHeadCksum(pRead->pHead);
L
Liu Jicong 已提交
400
  if (code != 0) {
L
fix  
Liu Jicong 已提交
401
    wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
L
Liu Jicong 已提交
402
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
403 404
    return -1;
  }
405

L
Liu Jicong 已提交
406
  if (pRead->capacity < pRead->pHead->head.bodyLen) {
L
Liu Jicong 已提交
407
    void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen);
L
Liu Jicong 已提交
408
    if (ptr == NULL) {
L
Liu Jicong 已提交
409
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
L
Liu Jicong 已提交
410 411
      return -1;
    }
L
Liu Jicong 已提交
412
    pRead->pHead = ptr;
L
Liu Jicong 已提交
413
    pRead->capacity = pRead->pHead->head.bodyLen;
L
Liu Jicong 已提交
414
  }
L
Liu Jicong 已提交
415

L
Liu Jicong 已提交
416
  if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
L
Liu Jicong 已提交
417 418 419
      pRead->pHead->head.bodyLen) {
    if (code < 0)
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minghao Li 已提交
420
    else {
L
Liu Jicong 已提交
421
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
M
Minghao Li 已提交
422 423
      ASSERT(0);
    }
L
Liu Jicong 已提交
424 425
    return -1;
  }
L
Liu Jicong 已提交
426

L
Liu Jicong 已提交
427
  if (pRead->pHead->head.version != ver) {
L
Liu Jicong 已提交
428 429
    wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
           ver);
L
Liu Jicong 已提交
430 431 432 433
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }
L
Liu Jicong 已提交
434

L
Liu Jicong 已提交
435
  code = walValidBodyCksum(pRead->pHead);
L
Liu Jicong 已提交
436
  if (code != 0) {
L
fix  
Liu Jicong 已提交
437
    wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver);
L
Liu Jicong 已提交
438
    pRead->curVersion = -1;
L
Liu Jicong 已提交
439
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
440 441
    return -1;
  }
L
Liu Jicong 已提交
442
  pRead->curVersion++;
L
Liu Jicong 已提交
443 444 445

  return 0;
}