sdbFile.c 9.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbRunDeployFp(SSdb *pSdb) {
S
Shengliang Guan 已提交
26
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
    if ((*fp)(pSdb->pMnode) != 0) {
33
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
34
      return -1;
S
Shengliang Guan 已提交
35 36 37
    }
  }

S
Shengliang Guan 已提交
38
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
39 40 41
  return 0;
}

42
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
59 60 61 62 63 64 65 66 67 68
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
69
    int64_t maxId = 0;
70
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
85
    int64_t ver = 0;
86
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
101
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
102 103 104 105 106 107 108 109 110 111 112 113
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

114
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
115 116 117 118 119 120
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

121
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
122 123 124 125 126
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
127
    int64_t maxId = 0;
S
Shengliang Guan 已提交
128 129 130
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
131
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
132 133 134 135 136 137
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
138
    int64_t ver = 0;
S
Shengliang Guan 已提交
139 140 141
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
142
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
143 144 145 146 147
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
148
  char reserve[SDB_RESERVE_SIZE] = {0};
149
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
150 151 152 153 154 155 156
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
157 158 159 160 161 162
int32_t sdbReadFile(SSdb *pSdb) {
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;

S
Shengliang Guan 已提交
163
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
164
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
165
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
166
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
167
    return -1;
S
Shengliang Guan 已提交
168 169 170
  }

  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
171
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
172
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
173

174 175
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
176
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
177 178
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
179
    return 0;
S
Shengliang Guan 已提交
180 181
  }

182
  if (sdbReadFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
183 184
    mError("failed to read file:%s head since %s", file, terrstr());
    pSdb->curVer = -1;
wafwerar's avatar
wafwerar 已提交
185
    taosMemoryFree(pRaw);
186
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
187 188 189
    return -1;
  }

S
Shengliang Guan 已提交
190 191 192
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
193
  while (1) {
S
Shengliang Guan 已提交
194
    readLen = sizeof(SSdbRaw);
195
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
196 197 198 199 200 201 202 203
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
204
    if (ret != readLen) {
S
Shengliang Guan 已提交
205 206 207 208 209
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
210
    readLen = pRaw->dataLen + sizeof(int32_t);
211
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
212 213 214 215 216 217
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
218
    if (ret != readLen) {
S
Shengliang Guan 已提交
219 220 221 222 223
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
224
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
225
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
226
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
227 228 229 230
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

231
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
232
    if (code != 0) {
S
Shengliang Guan 已提交
233
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
234
      goto _OVER;
S
Shengliang Guan 已提交
235 236 237 238
    }
  }

  code = 0;
S
Shengliang Guan 已提交
239
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
240
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
S
Shengliang Guan 已提交
241
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
242

S
Shengliang Guan 已提交
243
_OVER:
244
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
245
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
246

S
Shengliang Guan 已提交
247
  terrno = code;
S
Shengliang Guan 已提交
248 249 250
  return code;
}

S
Shengliang Guan 已提交
251
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
252 253
  int32_t code = 0;

S
Shengliang Guan 已提交
254
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
255
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
256
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
257
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
258

S
Shengliang Guan 已提交
259 260
  mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
         pSdb->lastCommitVer);
S
Shengliang Guan 已提交
261

262
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
263
  if (pFile == NULL) {
S
Shengliang Guan 已提交
264 265 266
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
267 268
  }

269
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
270
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
271
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
272 273 274 275
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
276
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
277 278
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
279
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
280

S
Shengliang Guan 已提交
281
    SHashObj       *hash = pSdb->hashObjs[i];
282 283
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
284

S
Shengliang Guan 已提交
285 286 287
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
288 289 290 291 292 293 294
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
295 296 297
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
298

299
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
300

S
Shengliang Guan 已提交
301 302
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
303
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
304
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
305
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
306
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
307
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
308
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
309 310
          break;
        }
S
Shengliang Guan 已提交
311 312

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
313
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
314
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
315
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
316
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
317 318
          break;
        }
S
Shengliang Guan 已提交
319 320
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
321
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
322 323 324
        break;
      }

S
Shengliang Guan 已提交
325
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
326
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
327
    }
328
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
329 330 331
  }

  if (code == 0) {
332
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
333 334
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
wafwerar's avatar
wafwerar 已提交
335
      mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
336
    }
S
Shengliang Guan 已提交
337 338
  }

339
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
340

S
Shengliang Guan 已提交
341
  if (code == 0) {
S
Shengliang Guan 已提交
342
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
343 344 345 346
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
347 348 349
  }

  if (code != 0) {
S
Shengliang Guan 已提交
350
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
351
  } else {
S
Shengliang Guan 已提交
352 353
    pSdb->lastCommitVer = pSdb->curVer;
    mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
354 355
  }

S
Shengliang Guan 已提交
356
  terrno = code;
S
Shengliang Guan 已提交
357 358 359
  return code;
}

S
Shengliang Guan 已提交
360 361 362 363 364 365 366 367
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

  return sdbWriteFileImp(pSdb);
}

S
Shengliang Guan 已提交
368
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
369 370
  if (sdbRunDeployFp(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
371 372
  }

S
Shengliang Guan 已提交
373
  if (sdbWriteFileImp(pSdb) != 0) {
S
Shengliang Guan 已提交
374
    return -1;
S
Shengliang Guan 已提交
375 376 377 378
  }

  return 0;
}