walRead.c 5.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15
 */

L
Liu Jicong 已提交
16
#include "tfile.h"
L
Liu Jicong 已提交
17
#include "walInt.h"
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
L
Liu Jicong 已提交
20
  SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
L
Liu Jicong 已提交
21
  if (pRead == NULL) {
L
Liu Jicong 已提交
22 23 24 25 26
    return NULL;
  }
  pRead->pWal = pWal;
  pRead->readIdxTfd = -1;
  pRead->readLogTfd = -1;
L
Liu Jicong 已提交
27 28 29 30 31
  pRead->curVersion = -1;
  pRead->curFileFirstVer = -1;
  pRead->capacity = 0;
  pRead->status = 0;
  pRead->pHead = malloc(sizeof(SWalHead));
L
Liu Jicong 已提交
32
  if (pRead->pHead == NULL) {
L
Liu Jicong 已提交
33 34 35 36
    free(pRead);
    return NULL;
  }
  return pRead;
L
Liu Jicong 已提交
37 38 39 40 41
}

void walCloseReadHandle(SWalReadHandle *pRead) {
  tfClose(pRead->readIdxTfd);
  tfClose(pRead->readLogTfd);
L
Liu Jicong 已提交
42
  tfree(pRead->pHead);
L
Liu Jicong 已提交
43 44 45
  free(pRead);
}

L
Liu Jicong 已提交
46
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { return 0; }
L
Liu Jicong 已提交
47 48 49 50 51 52

static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
  int code = 0;

  int64_t idxTfd = pRead->readIdxTfd;
  int64_t logTfd = pRead->readLogTfd;
L
Liu Jicong 已提交
53 54

  // seek position
L
Liu Jicong 已提交
55
  int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
L
Liu Jicong 已提交
56
  code = tfLseek(idxTfd, offset, SEEK_SET);
L
Liu Jicong 已提交
57
  if (code < 0) {
L
Liu Jicong 已提交
58 59
    return -1;
  }
L
Liu Jicong 已提交
60 61
  SWalIdxEntry entry;
  if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
L
Liu Jicong 已提交
62 63
    return -1;
  }
L
Liu Jicong 已提交
64
  // TODO:deserialize
L
Liu Jicong 已提交
65 66
  ASSERT(entry.ver == ver);
  code = tfLseek(logTfd, entry.offset, SEEK_SET);
L
Liu Jicong 已提交
67
  if (code < 0) {
L
Liu Jicong 已提交
68 69 70 71 72 73 74 75 76 77 78 79
    return -1;
  }
  return code;
}

static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
  char fnameStr[WAL_FILE_LEN];

  tfClose(pRead->readIdxTfd);
  tfClose(pRead->readLogTfd);

  walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
L
Liu Jicong 已提交
80
  int64_t logTfd = tfOpenRead(fnameStr);
L
Liu Jicong 已提交
81
  if (logTfd < 0) {
L
Liu Jicong 已提交
82 83 84 85
    return -1;
  }

  walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
L
Liu Jicong 已提交
86
  int64_t idxTfd = tfOpenRead(fnameStr);
L
Liu Jicong 已提交
87
  if (idxTfd < 0) {
L
Liu Jicong 已提交
88 89 90 91 92 93 94 95 96
    return -1;
  }

  pRead->readLogTfd = logTfd;
  pRead->readIdxTfd = idxTfd;
  return 0;
}

static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
L
Liu Jicong 已提交
97
  int   code;
L
Liu Jicong 已提交
98
  SWal *pWal = pRead->pWal;
L
Liu Jicong 已提交
99
  if (ver == pRead->curVersion) {
L
Liu Jicong 已提交
100 101
    return 0;
  }
L
Liu Jicong 已提交
102
  if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
L
Liu Jicong 已提交
103 104
    return -1;
  }
L
Liu Jicong 已提交
105
  if (ver < pWal->vers.snapshotVer) {
L
Liu Jicong 已提交
106 107
  }

L
Liu Jicong 已提交
108
  SWalFileInfo tmpInfo;
L
Liu Jicong 已提交
109
  tmpInfo.firstVer = ver;
L
Liu Jicong 已提交
110
  // bsearch in fileSet
L
Liu Jicong 已提交
111
  SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
L
Liu Jicong 已提交
112
  ASSERT(pRet != NULL);
L
Liu Jicong 已提交
113
  if (pRead->curFileFirstVer != pRet->firstVer) {
L
Liu Jicong 已提交
114
    code = walReadChangeFile(pRead, pRet->firstVer);
L
Liu Jicong 已提交
115 116
    if (code < 0) {
      // TODO: set error flag
L
Liu Jicong 已提交
117 118 119 120 121
      return -1;
    }
  }

  code = walReadSeekFilePos(pRead, pRet->firstVer, ver);
L
Liu Jicong 已提交
122
  if (code < 0) {
L
Liu Jicong 已提交
123 124 125
    return -1;
  }
  pRead->curVersion = ver;
L
Liu Jicong 已提交
126

L
Liu Jicong 已提交
127 128 129 130 131
  return 0;
}

int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
  int code;
L
Liu Jicong 已提交
132 133 134 135
  // TODO: check wal life
  if (pRead->curVersion != ver) {
    code = walReadSeekVer(pRead, ver);
    if (code != 0) {
L
Liu Jicong 已提交
136 137
      return -1;
    }
L
Liu Jicong 已提交
138 139
  }

L
Liu Jicong 已提交
140
  if (!tfValid(pRead->readLogTfd)) return -1;
L
Liu Jicong 已提交
141

L
Liu Jicong 已提交
142
  code = tfRead(pRead->readLogTfd, pRead->pHead, sizeof(SWalHead));
L
Liu Jicong 已提交
143
  if (code != sizeof(SWalHead)) {
L
Liu Jicong 已提交
144 145
    return -1;
  }
L
Liu Jicong 已提交
146
  code = walValidHeadCksum(pRead->pHead);
L
Liu Jicong 已提交
147
  if (code != 0) {
L
Liu Jicong 已提交
148 149
    return -1;
  }
L
Liu Jicong 已提交
150 151 152
  if (pRead->capacity < pRead->pHead->head.len) {
    void *ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
    if (ptr == NULL) {
L
Liu Jicong 已提交
153 154
      return -1;
    }
L
Liu Jicong 已提交
155 156
    pRead->pHead = ptr;
    pRead->capacity = pRead->pHead->head.len;
L
Liu Jicong 已提交
157
  }
L
Liu Jicong 已提交
158
  if (pRead->pHead->head.len != tfRead(pRead->readLogTfd, pRead->pHead->head.body, pRead->pHead->head.len)) {
L
Liu Jicong 已提交
159 160
    return -1;
  }
L
Liu Jicong 已提交
161 162 163

  ASSERT(pRead->pHead->head.version == ver);

L
Liu Jicong 已提交
164
  code = walValidBodyCksum(pRead->pHead);
L
Liu Jicong 已提交
165
  if (code != 0) {
L
Liu Jicong 已提交
166 167
    return -1;
  }
L
Liu Jicong 已提交
168
  pRead->curVersion++;
L
Liu Jicong 已提交
169 170 171 172

  return 0;
}

L
Liu Jicong 已提交
173
#if 0
L
Liu Jicong 已提交
174
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
L
Liu Jicong 已提交
175 176
  int code;
  code = walSeekVer(pWal, ver);
L
Liu Jicong 已提交
177
  if (code != 0) {
L
Liu Jicong 已提交
178 179
    return code;
  }
L
Liu Jicong 已提交
180 181 182
  if (*ppHead == NULL) {
    void *ptr = realloc(*ppHead, sizeof(SWalHead));
    if (ptr == NULL) {
L
Liu Jicong 已提交
183 184 185 186
      return -1;
    }
    *ppHead = ptr;
  }
L
Liu Jicong 已提交
187
  if (tfRead(pWal->writeLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
L
Liu Jicong 已提交
188 189
    return -1;
  }
L
Liu Jicong 已提交
190 191
  // TODO: endian compatibility processing after read
  if (walValidHeadCksum(*ppHead) != 0) {
L
Liu Jicong 已提交
192 193
    return -1;
  }
L
Liu Jicong 已提交
194 195
  void *ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
  if (ptr == NULL) {
L
Liu Jicong 已提交
196 197 198 199
    free(*ppHead);
    *ppHead = NULL;
    return -1;
  }
L
Liu Jicong 已提交
200
  if (tfRead(pWal->writeLogTfd, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
L
Liu Jicong 已提交
201 202
    return -1;
  }
L
Liu Jicong 已提交
203 204
  // TODO: endian compatibility processing after read
  if (walValidBodyCksum(*ppHead) != 0) {
L
Liu Jicong 已提交
205 206
    return -1;
  }
L
Liu Jicong 已提交
207

L
Liu Jicong 已提交
208 209
  return 0;
}
S
Shengliang Guan 已提交
210

L
Liu Jicong 已提交
211 212 213 214
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif