sdbFile.c 8.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define SDB_TABLE_SIZE 24
#define SDB_RESERVE_SIZE 512

S
Shengliang Guan 已提交
23
static int32_t sdbRunDeployFp(SSdb *pSdb) {
S
Shengliang Guan 已提交
24
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
27
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
28
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
29

S
Shengliang Guan 已提交
30
    if ((*fp)(pSdb->pMnode) != 0) {
S
Shengliang Guan 已提交
31 32
      mError("failed to deploy sdb:%d since %s", i, terrstr());
      return -1;
S
Shengliang Guan 已提交
33 34 35
    }
  }

S
Shengliang Guan 已提交
36
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
37 38 39
  return 0;
}

S
Shengliang Guan 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
static int32_t sdbReadFileHead(SSdb *pSdb, FileFd fd) {
  int32_t ret = taosReadFile(fd, &pSdb->curVer, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t maxId = -1;
    ret = taosReadFile(fd, &maxId, sizeof(int64_t));
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t ver = -1;
    ret = taosReadFile(fd, &ver, sizeof(int64_t));
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
S
Shengliang Guan 已提交
84
  ret = taosReadFile(fd, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

static int32_t sdbWriteFileHead(SSdb *pSdb, FileFd fd) {
  if (taosWriteFile(fd, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t maxId = -1;
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
    if (taosWriteFile(fd, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t ver = -1;
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
    if (taosWriteFile(fd, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
125
  char reserve[SDB_RESERVE_SIZE] = {0};
S
Shengliang Guan 已提交
126 127 128 129 130 131 132 133
  if (taosWriteFile(fd, reserve, sizeof(reserve)) != sizeof(reserve)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
134 135 136 137 138 139
int32_t sdbReadFile(SSdb *pSdb) {
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;

S
Shengliang Guan 已提交
140 141
  SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
142
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
143
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
144
    return -1;
S
Shengliang Guan 已提交
145 146 147
  }

  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
148
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
149
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
150

S
Shengliang Guan 已提交
151
  FileFd fd = taosOpenFileRead(file);
S
Shengliang Guan 已提交
152
  if (fd <= 0) {
S
Shengliang Guan 已提交
153
    free(pRaw);
S
Shengliang Guan 已提交
154 155
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
156
    return 0;
S
Shengliang Guan 已提交
157 158
  }

S
Shengliang Guan 已提交
159 160 161 162 163 164 165 166
  if (sdbReadFileHead(pSdb, fd) != 0) {
    mError("failed to read file:%s head since %s", file, terrstr());
    pSdb->curVer = -1;
    free(pRaw);
    taosCloseFile(fd);
    return -1;
  }

S
Shengliang Guan 已提交
167
  while (1) {
S
Shengliang Guan 已提交
168 169
    readLen = sizeof(SSdbRaw);
    ret = taosReadFile(fd, pRaw, readLen);
S
Shengliang Guan 已提交
170 171 172 173 174 175 176 177
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
178
    if (ret != readLen) {
S
Shengliang Guan 已提交
179 180 181 182 183
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
184 185
    readLen = pRaw->dataLen + sizeof(int32_t);
    ret = taosReadFile(fd, pRaw->pData, readLen);
S
Shengliang Guan 已提交
186 187 188 189 190 191
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
192
    if (ret != readLen) {
S
Shengliang Guan 已提交
193 194 195 196 197
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
198 199
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
    if (!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen) != 0) {
S
Shengliang Guan 已提交
200
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
201 202 203 204
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
205
    code = sdbWriteNotFree(pSdb, pRaw);
S
Shengliang Guan 已提交
206
    if (code != 0) {
S
Shengliang Guan 已提交
207
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
208 209 210 211 212
      goto PARSE_SDB_DATA_ERROR;
    }
  }

  code = 0;
S
Shengliang Guan 已提交
213
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
214
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
215 216 217

PARSE_SDB_DATA_ERROR:
  taosCloseFile(fd);
S
Shengliang Guan 已提交
218
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
219

S
Shengliang Guan 已提交
220
  terrno = code;
S
Shengliang Guan 已提交
221 222 223
  return code;
}

S
Shengliang Guan 已提交
224 225 226
int32_t sdbWriteFile(SSdb *pSdb) {
  int32_t code = 0;

S
Shengliang Guan 已提交
227
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
228
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
229
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
230
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
231

S
Shengliang Guan 已提交
232 233
  mDebug("start to write file:%s", curfile);

S
Shengliang Guan 已提交
234 235
  FileFd fd = taosOpenFileCreateWrite(tmpfile);
  if (fd <= 0) {
S
Shengliang Guan 已提交
236 237 238
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
239 240
  }

S
Shengliang Guan 已提交
241 242 243 244 245 246 247
  if (sdbWriteFileHead(pSdb, fd) != 0) {
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
    taosCloseFile(fd);
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
248
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
249 250
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
251
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
252

S
Shengliang Guan 已提交
253 254
    SHashObj *hash = pSdb->hashObjs[i];
    SRWLatch *pLock = &pSdb->locks[i];
S
Shengliang Guan 已提交
255
    taosWLockLatch(pLock);
S
Shengliang Guan 已提交
256

S
Shengliang Guan 已提交
257 258 259 260 261 262 263
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
264

S
Shengliang Guan 已提交
265 266
      sdbPrintOper(pSdb, pRow, "writeFile");

S
Shengliang Guan 已提交
267 268
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
269
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
270 271
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
        if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
272
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
273
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
274
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
275 276
          break;
        }
S
Shengliang Guan 已提交
277 278

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
S
Shengliang Guan 已提交
279
        if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
280
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
281
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
282
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
283 284
          break;
        }
S
Shengliang Guan 已提交
285 286
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
287
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
288 289 290
        break;
      }

S
Shengliang Guan 已提交
291
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
292
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
293
    }
S
Shengliang Guan 已提交
294
    taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
295 296 297 298
  }

  if (code == 0) {
    code = taosFsyncFile(fd);
S
Shengliang Guan 已提交
299 300 301 302
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", tmpfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
303 304
  }

S
Shengliang Guan 已提交
305 306
  taosCloseFile(fd);

S
Shengliang Guan 已提交
307
  if (code == 0) {
S
Shengliang Guan 已提交
308
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
309 310 311 312
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
313 314 315
  }

  if (code != 0) {
S
Shengliang Guan 已提交
316
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
317
  } else {
S
Shengliang Guan 已提交
318 319
    pSdb->lastCommitVer = pSdb->curVer;
    mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
320 321
  }

S
Shengliang Guan 已提交
322
  terrno = code;
S
Shengliang Guan 已提交
323 324 325
  return code;
}

S
Shengliang Guan 已提交
326
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
327 328
  if (sdbRunDeployFp(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
329 330
  }

S
Shengliang Guan 已提交
331 332
  if (sdbWriteFile(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
333 334 335 336
  }

  return 0;
}