walRead.c 6.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "taoserror.h"
L
Liu Jicong 已提交
17
#include "walInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
wafwerar's avatar
wafwerar 已提交
20
  SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle));
L
Liu Jicong 已提交
21
  if (pRead == NULL) {
22
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
23 24
    return NULL;
  }
25

L
Liu Jicong 已提交
26
  pRead->pWal = pWal;
27 28
  pRead->pReadIdxTFile = NULL;
  pRead->pReadLogTFile = NULL;
L
Liu Jicong 已提交
29 30 31 32
  pRead->curVersion = -1;
  pRead->curFileFirstVer = -1;
  pRead->capacity = 0;
  pRead->status = 0;
L
fix  
Liu Jicong 已提交
33 34 35

  taosThreadMutexInit(&pRead->mutex, NULL);

wafwerar's avatar
wafwerar 已提交
36
  pRead->pHead = taosMemoryMalloc(sizeof(SWalHead));
L
Liu Jicong 已提交
37
  if (pRead->pHead == NULL) {
L
Liu Jicong 已提交
38
    terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
39
    taosMemoryFree(pRead);
L
Liu Jicong 已提交
40 41 42
    return NULL;
  }
  return pRead;
L
Liu Jicong 已提交
43 44 45
}

void walCloseReadHandle(SWalReadHandle *pRead) {
46 47
  taosCloseFile(&pRead->pReadIdxTFile);
  taosCloseFile(&pRead->pReadLogTFile);
wafwerar's avatar
wafwerar 已提交
48 49
  taosMemoryFreeClear(pRead->pHead);
  taosMemoryFree(pRead);
L
Liu Jicong 已提交
50 51
}

L
Liu Jicong 已提交
52
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { return 0; }
L
Liu Jicong 已提交
53 54 55 56

static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
  int code = 0;

57 58
  TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
  TdFilePtr pLogTFile = pRead->pReadLogTFile;
L
Liu Jicong 已提交
59 60

  // seek position
L
Liu Jicong 已提交
61
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
62
  code = taosLSeekFile(pIdxTFile, offset, SEEK_SET);
L
Liu Jicong 已提交
63
  if (code < 0) {
L
Liu Jicong 已提交
64
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
65 66
    return -1;
  }
L
Liu Jicong 已提交
67
  SWalIdxEntry entry;
68
  if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
L
Liu Jicong 已提交
69
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
70 71
    return -1;
  }
L
Liu Jicong 已提交
72
  // TODO:deserialize
L
Liu Jicong 已提交
73
  ASSERT(entry.ver == ver);
74
  code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
L
Liu Jicong 已提交
75
  if (code < 0) {
L
Liu Jicong 已提交
76
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
77 78 79 80 81 82 83 84
    return -1;
  }
  return code;
}

static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
  char fnameStr[WAL_FILE_LEN];

85 86
  taosCloseFile(&pRead->pReadIdxTFile);
  taosCloseFile(&pRead->pReadLogTFile);
L
Liu Jicong 已提交
87 88

  walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
89 90
  TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pLogTFile == NULL) {
L
Liu Jicong 已提交
91
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
92 93 94 95
    return -1;
  }

  walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
96 97
  TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
  if (pIdxTFile == NULL) {
L
Liu Jicong 已提交
98
    terrno = TAOS_SYSTEM_ERROR(errno);
L
Liu Jicong 已提交
99 100 101
    return -1;
  }

102 103
  pRead->pReadLogTFile = pLogTFile;
  pRead->pReadIdxTFile = pIdxTFile;
L
Liu Jicong 已提交
104 105 106 107
  return 0;
}

static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
L
Liu Jicong 已提交
108
  int   code;
L
Liu Jicong 已提交
109
  SWal *pWal = pRead->pWal;
L
Liu Jicong 已提交
110
  if (ver == pRead->curVersion) {
L
Liu Jicong 已提交
111 112
    return 0;
  }
L
Liu Jicong 已提交
113
  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
L
Liu Jicong 已提交
114
    terrno = TSDB_CODE_WAL_INVALID_VER;
L
Liu Jicong 已提交
115 116
    return -1;
  }
L
Liu Jicong 已提交
117
  if (ver < pWal->vers.snapshotVer) {
L
Liu Jicong 已提交
118 119
  }

L
Liu Jicong 已提交
120
  SWalFileInfo tmpInfo;
L
Liu Jicong 已提交
121
  tmpInfo.firstVer = ver;
L
Liu Jicong 已提交
122
  // bsearch in fileSet
L
Liu Jicong 已提交
123
  SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
L
Liu Jicong 已提交
124
  ASSERT(pRet != NULL);
L
Liu Jicong 已提交
125
  if (pRead->curFileFirstVer != pRet->firstVer) {
L
Liu Jicong 已提交
126
    code = walReadChangeFile(pRead, pRet->firstVer);
L
Liu Jicong 已提交
127
    if (code < 0) {
L
Liu Jicong 已提交
128 129 130 131 132
      return -1;
    }
  }

  code = walReadSeekFilePos(pRead, pRet->firstVer, ver);
L
Liu Jicong 已提交
133
  if (code < 0) {
L
Liu Jicong 已提交
134 135 136
    return -1;
  }
  pRead->curVersion = ver;
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138 139 140
  return 0;
}

L
fix  
Liu Jicong 已提交
141 142 143 144 145 146
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) {
  taosThreadMutexLock(&pRead->mutex);
  if (walReadWithHandle(pRead, ver) < 0) {
    taosThreadMutexUnlock(&pRead->mutex);
    return -1;
  }
L
Liu Jicong 已提交
147
  *ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
L
fix  
Liu Jicong 已提交
148 149 150 151
  if (*ppHead == NULL) {
    taosThreadMutexUnlock(&pRead->mutex);
    return -1;
  }
L
Liu Jicong 已提交
152
  memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
L
fix  
Liu Jicong 已提交
153 154 155 156
  taosThreadMutexUnlock(&pRead->mutex);
  return 0;
}

L
Liu Jicong 已提交
157 158
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
  int code;
L
Liu Jicong 已提交
159 160 161
  // TODO: check wal life
  if (pRead->curVersion != ver) {
    code = walReadSeekVer(pRead, ver);
L
Liu Jicong 已提交
162
    if (code < 0) {
L
Liu Jicong 已提交
163 164
      return -1;
    }
L
Liu Jicong 已提交
165 166
  }

L
fix  
Liu Jicong 已提交
167 168 169
  if (!taosValidFile(pRead->pReadLogTFile)) {
    return -1;
  }
L
Liu Jicong 已提交
170

171
  code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
L
Liu Jicong 已提交
172
  if (code != sizeof(SWalHead)) {
L
Liu Jicong 已提交
173 174
    return -1;
  }
L
Liu Jicong 已提交
175
  code = walValidHeadCksum(pRead->pHead);
L
Liu Jicong 已提交
176
  if (code != 0) {
L
fix  
Liu Jicong 已提交
177
    wError("unexpected wal log version: % " PRId64 ", since head checksum not passed", ver);
L
Liu Jicong 已提交
178
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
179 180
    return -1;
  }
L
Liu Jicong 已提交
181 182
  if (pRead->capacity < pRead->pHead->head.bodyLen) {
    void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
L
Liu Jicong 已提交
183
    if (ptr == NULL) {
L
Liu Jicong 已提交
184
      terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
L
Liu Jicong 已提交
185 186
      return -1;
    }
L
Liu Jicong 已提交
187
    pRead->pHead = ptr;
L
Liu Jicong 已提交
188
    pRead->capacity = pRead->pHead->head.bodyLen;
L
Liu Jicong 已提交
189
  }
L
Liu Jicong 已提交
190 191 192

  if (pRead->pHead->head.bodyLen !=
      taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) {
L
Liu Jicong 已提交
193 194
    return -1;
  }
L
Liu Jicong 已提交
195

L
Liu Jicong 已提交
196
  if (pRead->pHead->head.version != ver) {
L
Liu Jicong 已提交
197 198
    wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
           ver);
L
Liu Jicong 已提交
199 200 201 202
    pRead->curVersion = -1;
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
    return -1;
  }
L
Liu Jicong 已提交
203

L
Liu Jicong 已提交
204
  code = walValidBodyCksum(pRead->pHead);
L
Liu Jicong 已提交
205
  if (code != 0) {
L
fix  
Liu Jicong 已提交
206
    wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver);
L
Liu Jicong 已提交
207
    pRead->curVersion = -1;
L
Liu Jicong 已提交
208
    terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
L
Liu Jicong 已提交
209 210
    return -1;
  }
L
Liu Jicong 已提交
211
  pRead->curVersion++;
L
Liu Jicong 已提交
212 213 214 215

  return 0;
}

L
Liu Jicong 已提交
216
#if 0
L
Liu Jicong 已提交
217
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
L
Liu Jicong 已提交
218 219
  int code;
  code = walSeekVer(pWal, ver);
L
Liu Jicong 已提交
220
  if (code != 0) {
L
Liu Jicong 已提交
221 222
    return code;
  }
L
Liu Jicong 已提交
223
  if (*ppHead == NULL) {
wafwerar's avatar
wafwerar 已提交
224
    void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead));
L
Liu Jicong 已提交
225
    if (ptr == NULL) {
L
Liu Jicong 已提交
226 227 228 229
      return -1;
    }
    *ppHead = ptr;
  }
230
  if (tfRead(pWal->pWriteLogTFile, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
L
Liu Jicong 已提交
231 232
    return -1;
  }
L
Liu Jicong 已提交
233 234
  // TODO: endian compatibility processing after read
  if (walValidHeadCksum(*ppHead) != 0) {
L
Liu Jicong 已提交
235 236
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
237
  void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
L
Liu Jicong 已提交
238
  if (ptr == NULL) {
wafwerar's avatar
wafwerar 已提交
239
    taosMemoryFree(*ppHead);
L
Liu Jicong 已提交
240 241 242
    *ppHead = NULL;
    return -1;
  }
243
  if (tfRead(pWal->pWriteLogTFile, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
L
Liu Jicong 已提交
244 245
    return -1;
  }
L
Liu Jicong 已提交
246 247
  // TODO: endian compatibility processing after read
  if (walValidBodyCksum(*ppHead) != 0) {
L
Liu Jicong 已提交
248 249
    return -1;
  }
L
Liu Jicong 已提交
250

L
Liu Jicong 已提交
251 252
  return 0;
}
S
Shengliang Guan 已提交
253

L
Liu Jicong 已提交
254 255 256 257
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif