walRead.c 10.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "taoserror.h"
L
Liu Jicong 已提交
17
#include "walInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
wafwerar's avatar
wafwerar 已提交
20
  SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle));
L
Liu Jicong 已提交
21
  if (pRead == NULL) {
22
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
23 24
    return NULL;
  }
25

L
Liu Jicong 已提交
26
  pRead->pWal = pWal;
27 28
  pRead->pReadIdxTFile = NULL;
  pRead->pReadLogTFile = NULL;
L
Liu Jicong 已提交
29 30 31 32
  pRead->curVersion = -1;
  pRead->curFileFirstVer = -1;
  pRead->capacity = 0;
  pRead->status = 0;
L
fix  
Liu Jicong 已提交
33 34 35

  taosThreadMutexInit(&pRead->mutex, NULL);

wafwerar's avatar
wafwerar 已提交
36
  pRead->pHead = taosMemoryMalloc(sizeof(SWalHead));
L
Liu Jicong 已提交
37
  if (pRead->pHead == NULL) {
L
Liu Jicong 已提交
38
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
39
    taosMemoryFree(pRead);
L
Liu Jicong 已提交
40 41 42
    return NULL;
  }
  return pRead;
L
Liu Jicong 已提交
43 44 45
}

void walCloseReadHandle(SWalReadHandle *pRead) {
46 47
  taosCloseFile(&pRead->pReadIdxTFile);
  taosCloseFile(&pRead->pReadLogTFile);
wafwerar's avatar
wafwerar 已提交
48 49
  taosMemoryFreeClear(pRead->pHead);
  taosMemoryFree(pRead);
L
Liu Jicong 已提交
50 51
}

L
Liu Jicong 已提交
52 53 54 55
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
  // TODO
  return 0;
}
L
Liu Jicong 已提交
56

L
Liu Jicong 已提交
57
static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
L
Liu Jicong 已提交
58
  int64_t ret = 0;
L
Liu Jicong 已提交
59

60 61
  TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
  TdFilePtr pLogTFile = pRead->pReadLogTFile;
L
Liu Jicong 已提交
62 63

  // seek position
L
Liu Jicong 已提交
64
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
L
Liu Jicong 已提交
65 66
  ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
67
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
68
    wError("failed to seek idx file, ver %ld, pos: %ld, since %s", ver, offset, terrstr());
L
Liu Jicong 已提交
69 70
    return -1;
  }
L
Liu Jicong 已提交
71
  SWalIdxEntry entry = {0};
L
Liu Jicong 已提交
72 73 74 75 76 77
  if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      wError("failed to read idx file, since %s", terrstr());
    } else {
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
78
      wError("read idx file incompletely, read bytes %ld, bytes should be %lu", ret, sizeof(SWalIdxEntry));
L
Liu Jicong 已提交
79
    }
L
Liu Jicong 已提交
80 81
    return -1;
  }
L
Liu Jicong 已提交
82

L
Liu Jicong 已提交
83
  ASSERT(entry.ver == ver);
L
Liu Jicong 已提交
84 85
  ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
  if (ret < 0) {
L
Liu Jicong 已提交
86
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
87
    wError("failed to seek log file, ver %ld, pos: %ld, since %s", ver, entry.offset, terrstr());
L
Liu Jicong 已提交
88 89
    return -1;
  }
L
Liu Jicong 已提交
90
  return ret;
L
Liu Jicong 已提交
91 92 93 94 95
}

static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
  char fnameStr[WAL_FILE_LEN];

96 97
  taosCloseFile(&pRead->pReadIdxTFile);
  taosCloseFile(&pRead->pReadLogTFile);
L
Liu Jicong 已提交
98 99

  walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
100 101
  TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pLogTFile == NULL) {
L
Liu Jicong 已提交
102
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
103
    wError("cannot open file %s, since %s", fnameStr, terrstr());
L
Liu Jicong 已提交
104 105 106
    return -1;
  }

L
Liu Jicong 已提交
107 108
  pRead->pReadLogTFile = pLogTFile;

L
Liu Jicong 已提交
109
  walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
110 111
  TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pIdxTFile == NULL) {
L
Liu Jicong 已提交
112
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
113
    wError("cannot open file %s, since %s", fnameStr, terrstr());
L
Liu Jicong 已提交
114 115 116
    return -1;
  }

117
  pRead->pReadIdxTFile = pIdxTFile;
L
Liu Jicong 已提交
118 119 120 121 122
  return 0;
}

static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
  SWal *pWal = pRead->pWal;
L
Liu Jicong 已提交
123
  if (ver == pRead->curVersion) {
L
Liu Jicong 已提交
124 125
    return 0;
  }
L
Liu Jicong 已提交
126
  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
L
Liu Jicong 已提交
127
    wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer);
L
Liu Jicong 已提交
128
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
L
Liu Jicong 已提交
129 130
    return -1;
  }
L
Liu Jicong 已提交
131
  if (ver < pWal->vers.snapshotVer) {
L
Liu Jicong 已提交
132 133
  }

L
Liu Jicong 已提交
134
  SWalFileInfo tmpInfo;
L
Liu Jicong 已提交
135
  tmpInfo.firstVer = ver;
L
Liu Jicong 已提交
136
  // bsearch in fileSet
L
Liu Jicong 已提交
137
  SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
L
Liu Jicong 已提交
138
  ASSERT(pRet != NULL);
L
Liu Jicong 已提交
139
  if (pRead->curFileFirstVer != pRet->firstVer) {
L
Liu Jicong 已提交
140
    // error code set inner
L
Liu Jicong 已提交
141
    if (walReadChangeFile(pRead, pRet->firstVer) < 0) {
L
Liu Jicong 已提交
142 143 144 145
      return -1;
    }
  }

L
Liu Jicong 已提交
146
  // error code set inner
L
Liu Jicong 已提交
147
  if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) {
L
Liu Jicong 已提交
148 149
    return -1;
  }
L
Liu Jicong 已提交
150

L
Liu Jicong 已提交
151
  pRead->curVersion = ver;
L
Liu Jicong 已提交
152

L
Liu Jicong 已提交
153 154 155
  return 0;
}

156 157 158
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }

int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
L
Liu Jicong 已提交
159
  int64_t code;
L
Liu Jicong 已提交
160

161
  // TODO: valid ver
L
Liu Jicong 已提交
162 163 164
  if (ver > pRead->pWal->vers.commitVer) {
    return -1;
  }
165 166 167 168 169 170

  if (pRead->curVersion != ver) {
    code = walReadSeekVer(pRead, ver);
    if (code < 0) return -1;
  }

171
  ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189

  code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead));
  if (code != sizeof(SWalHead)) {
    return -1;
  }

  code = walValidHeadCksum(pHead);

  if (code != 0) {
    wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) {
L
Liu Jicong 已提交
190
  int64_t code;
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

  ASSERT(pRead->curVersion == pHead->head.version);

  code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR);
  if (code < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    pRead->curVersion = -1;
    return -1;
  }

  pRead->curVersion++;

  return 0;
}

int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) {
  SWalReadHead *pReadHead = &((*ppHead)->head);
  int64_t       ver = pReadHead->version;

  if (pRead->capacity < pReadHead->bodyLen) {
    void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + pReadHead->bodyLen);
    if (ptr == NULL) {
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
      return -1;
    }
    *ppHead = ptr;
L
Liu Jicong 已提交
217
    pReadHead = &((*ppHead)->head);
218 219 220 221
    pRead->capacity = pReadHead->bodyLen;
  }

  if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) {
L
Liu Jicong 已提交
222
    ASSERT(0);
223 224 225 226
    return -1;
  }

  if (pReadHead->version != ver) {
L
Liu Jicong 已提交
227
    wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
228 229 230 231 232 233
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  if (walValidBodyCksum(*ppHead) != 0) {
L
Liu Jicong 已提交
234
    wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
235 236 237 238 239 240 241 242 243
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }

  pRead->curVersion = ver + 1;
  return 0;
}

L
fix  
Liu Jicong 已提交
244 245 246 247 248 249
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) {
  taosThreadMutexLock(&pRead->mutex);
  if (walReadWithHandle(pRead, ver) < 0) {
    taosThreadMutexUnlock(&pRead->mutex);
    return -1;
  }
L
Liu Jicong 已提交
250
  *ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
L
fix  
Liu Jicong 已提交
251 252 253 254
  if (*ppHead == NULL) {
    taosThreadMutexUnlock(&pRead->mutex);
    return -1;
  }
L
Liu Jicong 已提交
255
  memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
L
fix  
Liu Jicong 已提交
256 257 258 259
  taosThreadMutexUnlock(&pRead->mutex);
  return 0;
}

L
Liu Jicong 已提交
260
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
L
Liu Jicong 已提交
261
  int64_t code;
262 263 264 265 266 267

  if (pRead->pWal->vers.firstVer == -1) {
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

L
Liu Jicong 已提交
268 269
  // TODO: check wal life
  if (pRead->curVersion != ver) {
L
Liu Jicong 已提交
270
    if (walReadSeekVer(pRead, ver) < 0) {
L
Liu Jicong 已提交
271
      wError("unexpected wal log version: % " PRId64 ", since %s", ver, terrstr());
L
Liu Jicong 已提交
272 273
      return -1;
    }
L
Liu Jicong 已提交
274 275
  }

M
Minghao Li 已提交
276 277 278 279 280 281 282
  if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
    wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pRead->pWal->vers.firstVer,
           pRead->pWal->vers.lastVer);
    terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
    return -1;
  }

283
  ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
L
Liu Jicong 已提交
284

285
  code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
L
Liu Jicong 已提交
286
  if (code != sizeof(SWalHead)) {
L
Liu Jicong 已提交
287 288
    if (code < 0)
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minghao Li 已提交
289
    else {
L
Liu Jicong 已提交
290
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
M
Minghao Li 已提交
291 292
      ASSERT(0);
    }
L
Liu Jicong 已提交
293 294
    return -1;
  }
295

L
Liu Jicong 已提交
296
  code = walValidHeadCksum(pRead->pHead);
L
Liu Jicong 已提交
297
  if (code != 0) {
L
fix  
Liu Jicong 已提交
298
    wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
L
Liu Jicong 已提交
299
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
300 301
    return -1;
  }
302

L
Liu Jicong 已提交
303 304
  if (pRead->capacity < pRead->pHead->head.bodyLen) {
    void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
L
Liu Jicong 已提交
305
    if (ptr == NULL) {
L
Liu Jicong 已提交
306
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
L
Liu Jicong 已提交
307 308
      return -1;
    }
L
Liu Jicong 已提交
309
    pRead->pHead = ptr;
L
Liu Jicong 已提交
310
    pRead->capacity = pRead->pHead->head.bodyLen;
L
Liu Jicong 已提交
311
  }
L
Liu Jicong 已提交
312

L
Liu Jicong 已提交
313 314 315 316
  if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
      pRead->pHead->head.bodyLen) {
    if (code < 0)
      terrno = TAOS_SYSTEM_ERROR(errno);
M
Minghao Li 已提交
317
    else {
L
Liu Jicong 已提交
318
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
M
Minghao Li 已提交
319 320
      ASSERT(0);
    }
L
Liu Jicong 已提交
321 322
    return -1;
  }
L
Liu Jicong 已提交
323

L
Liu Jicong 已提交
324
  if (pRead->pHead->head.version != ver) {
L
Liu Jicong 已提交
325 326
    wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
           ver);
L
Liu Jicong 已提交
327 328 329 330
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }
L
Liu Jicong 已提交
331

L
Liu Jicong 已提交
332
  code = walValidBodyCksum(pRead->pHead);
L
Liu Jicong 已提交
333
  if (code != 0) {
L
fix  
Liu Jicong 已提交
334
    wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver);
L
Liu Jicong 已提交
335
    pRead->curVersion = -1;
L
Liu Jicong 已提交
336
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
337 338
    return -1;
  }
L
Liu Jicong 已提交
339
  pRead->curVersion++;
L
Liu Jicong 已提交
340 341 342 343

  return 0;
}

L
Liu Jicong 已提交
344
#if 0
L
Liu Jicong 已提交
345
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
L
Liu Jicong 已提交
346 347
  int code;
  code = walSeekVer(pWal, ver);
L
Liu Jicong 已提交
348
  if (code != 0) {
L
Liu Jicong 已提交
349 350
    return code;
  }
L
Liu Jicong 已提交
351
  if (*ppHead == NULL) {
wafwerar's avatar
wafwerar 已提交
352
    void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead));
L
Liu Jicong 已提交
353
    if (ptr == NULL) {
L
Liu Jicong 已提交
354 355 356 357
      return -1;
    }
    *ppHead = ptr;
  }
358
  if (tfRead(pWal->pWriteLogTFile, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
L
Liu Jicong 已提交
359 360
    return -1;
  }
L
Liu Jicong 已提交
361 362
  // TODO: endian compatibility processing after read
  if (walValidHeadCksum(*ppHead) != 0) {
L
Liu Jicong 已提交
363 364
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
365
  void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
L
Liu Jicong 已提交
366
  if (ptr == NULL) {
wafwerar's avatar
wafwerar 已提交
367
    taosMemoryFree(*ppHead);
L
Liu Jicong 已提交
368 369 370
    *ppHead = NULL;
    return -1;
  }
371
  if (tfRead(pWal->pWriteLogTFile, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
L
Liu Jicong 已提交
372 373
    return -1;
  }
L
Liu Jicong 已提交
374 375
  // TODO: endian compatibility processing after read
  if (walValidBodyCksum(*ppHead) != 0) {
L
Liu Jicong 已提交
376 377
    return -1;
  }
L
Liu Jicong 已提交
378

L
Liu Jicong 已提交
379 380
  return 0;
}
S
Shengliang Guan 已提交
381

L
Liu Jicong 已提交
382 383 384 385
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif