sdbFile.c 8.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define SDB_TABLE_SIZE 24
#define SDB_RESERVE_SIZE 512

S
Shengliang Guan 已提交
23
static int32_t sdbRunDeployFp(SSdb *pSdb) {
S
Shengliang Guan 已提交
24
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
27
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
28
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
29

S
Shengliang Guan 已提交
30
    if ((*fp)(pSdb->pMnode) != 0) {
S
Shengliang Guan 已提交
31 32
      mError("failed to deploy sdb:%d since %s", i, terrstr());
      return -1;
S
Shengliang Guan 已提交
33 34 35
    }
  }

S
Shengliang Guan 已提交
36
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
37 38 39
  return 0;
}

S
Shengliang Guan 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
static int32_t sdbReadFileHead(SSdb *pSdb, FileFd fd) {
  int32_t ret = taosReadFile(fd, &pSdb->curVer, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t maxId = -1;
    ret = taosReadFile(fd, &maxId, sizeof(int64_t));
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t ver = -1;
    ret = taosReadFile(fd, &ver, sizeof(int64_t));
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
S
Shengliang Guan 已提交
84
  ret = taosReadFile(fd, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

static int32_t sdbWriteFileHead(SSdb *pSdb, FileFd fd) {
  if (taosWriteFile(fd, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t maxId = -1;
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
    if (taosWriteFile(fd, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t ver = -1;
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
    if (taosWriteFile(fd, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
125
  char reserve[SDB_RESERVE_SIZE] = {0};
S
Shengliang Guan 已提交
126 127 128 129 130 131 132 133
  if (taosWriteFile(fd, reserve, sizeof(reserve)) != sizeof(reserve)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
134 135 136 137 138 139
int32_t sdbReadFile(SSdb *pSdb) {
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;

S
Shengliang Guan 已提交
140 141
  SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
142
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
143
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
144
    return -1;
S
Shengliang Guan 已提交
145 146 147
  }

  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
148
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
149
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
150

S
Shengliang Guan 已提交
151
  FileFd fd = taosOpenFileRead(file);
S
Shengliang Guan 已提交
152
  if (fd <= 0) {
S
Shengliang Guan 已提交
153
    free(pRaw);
S
Shengliang Guan 已提交
154 155
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
156
    return 0;
S
Shengliang Guan 已提交
157 158
  }

S
Shengliang Guan 已提交
159 160 161 162 163 164 165 166
  if (sdbReadFileHead(pSdb, fd) != 0) {
    mError("failed to read file:%s head since %s", file, terrstr());
    pSdb->curVer = -1;
    free(pRaw);
    taosCloseFile(fd);
    return -1;
  }

S
Shengliang Guan 已提交
167
  while (1) {
S
Shengliang Guan 已提交
168 169
    readLen = sizeof(SSdbRaw);
    ret = taosReadFile(fd, pRaw, readLen);
S
Shengliang Guan 已提交
170 171 172 173 174 175 176 177
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
178
    if (ret != readLen) {
S
Shengliang Guan 已提交
179 180 181 182 183
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
184 185
    readLen = pRaw->dataLen + sizeof(int32_t);
    ret = taosReadFile(fd, pRaw->pData, readLen);
S
Shengliang Guan 已提交
186 187 188 189 190 191
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
192
    if (ret != readLen) {
S
Shengliang Guan 已提交
193 194 195 196 197
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
198 199
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
    if (!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen) != 0) {
S
Shengliang Guan 已提交
200
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
201 202 203 204
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
205
    code = sdbWriteNotFree(pSdb, pRaw);
S
Shengliang Guan 已提交
206
    if (code != 0) {
S
Shengliang Guan 已提交
207
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
208 209 210 211 212
      goto PARSE_SDB_DATA_ERROR;
    }
  }

  code = 0;
S
Shengliang Guan 已提交
213
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
214
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
215 216 217

PARSE_SDB_DATA_ERROR:
  taosCloseFile(fd);
S
Shengliang Guan 已提交
218
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
219

S
Shengliang Guan 已提交
220
  terrno = code;
S
Shengliang Guan 已提交
221 222 223
  return code;
}

S
Shengliang Guan 已提交
224
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
225 226
  int32_t code = 0;

S
Shengliang Guan 已提交
227
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
228
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
229
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
230
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
231

S
Shengliang Guan 已提交
232 233
  mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
         pSdb->lastCommitVer);
S
Shengliang Guan 已提交
234

S
Shengliang Guan 已提交
235
  FileFd fd = taosOpenFileCreateWriteTrunc(tmpfile);
S
Shengliang Guan 已提交
236
  if (fd <= 0) {
S
Shengliang Guan 已提交
237 238 239
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
240 241
  }

S
Shengliang Guan 已提交
242 243 244 245 246 247 248
  if (sdbWriteFileHead(pSdb, fd) != 0) {
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
    taosCloseFile(fd);
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
249
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
250 251
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
252
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
253

S
Shengliang Guan 已提交
254 255
    SHashObj *hash = pSdb->hashObjs[i];
    SRWLatch *pLock = &pSdb->locks[i];
S
Shengliang Guan 已提交
256
    taosWLockLatch(pLock);
S
Shengliang Guan 已提交
257

S
Shengliang Guan 已提交
258 259 260 261 262 263 264
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
265

S
Shengliang Guan 已提交
266 267
      sdbPrintOper(pSdb, pRow, "writeFile");

S
Shengliang Guan 已提交
268 269
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
270
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
271 272
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
        if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
273
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
274
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
275
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
276 277
          break;
        }
S
Shengliang Guan 已提交
278 279

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
S
Shengliang Guan 已提交
280
        if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
281
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
282
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
283
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
284 285
          break;
        }
S
Shengliang Guan 已提交
286 287
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
288
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
289 290 291
        break;
      }

S
Shengliang Guan 已提交
292
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
293
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
294
    }
S
Shengliang Guan 已提交
295
    taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
296 297 298 299
  }

  if (code == 0) {
    code = taosFsyncFile(fd);
S
Shengliang Guan 已提交
300 301 302 303
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", tmpfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
304 305
  }

S
Shengliang Guan 已提交
306 307
  taosCloseFile(fd);

S
Shengliang Guan 已提交
308
  if (code == 0) {
S
Shengliang Guan 已提交
309
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
310 311 312 313
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
314 315 316
  }

  if (code != 0) {
S
Shengliang Guan 已提交
317
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
318
  } else {
S
Shengliang Guan 已提交
319 320
    pSdb->lastCommitVer = pSdb->curVer;
    mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
321 322
  }

S
Shengliang Guan 已提交
323
  terrno = code;
S
Shengliang Guan 已提交
324 325 326
  return code;
}

S
Shengliang Guan 已提交
327 328 329 330 331 332 333 334
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

  return sdbWriteFileImp(pSdb);
}

S
Shengliang Guan 已提交
335
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
336 337
  if (sdbRunDeployFp(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
338 339
  }

S
Shengliang Guan 已提交
340
  if (sdbWriteFileImp(pSdb) != 0) {
S
Shengliang Guan 已提交
341
    return -1;
S
Shengliang Guan 已提交
342 343 344 345
  }

  return 0;
}