sdbFile.c 12.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbRunDeployFp(SSdb *pSdb) {
S
Shengliang Guan 已提交
26
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
    if ((*fp)(pSdb->pMnode) != 0) {
33
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
34
      return -1;
S
Shengliang Guan 已提交
35 36 37
    }
  }

S
Shengliang Guan 已提交
38
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
39 40 41
  return 0;
}

42
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
59 60 61 62 63 64 65 66 67
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

68 69 70 71 72 73 74 75 76 77
  ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
78
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
79
    int64_t maxId = 0;
80
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
95
    int64_t ver = 0;
96
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
111
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
112 113 114 115 116 117 118 119 120 121 122 123
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

124
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
125 126 127 128 129 130
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

131
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
132 133 134 135
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

136 137 138 139 140
  if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
141
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
142
    int64_t maxId = 0;
S
Shengliang Guan 已提交
143 144 145
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
146
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
147 148 149 150 151 152
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
153
    int64_t ver = 0;
S
Shengliang Guan 已提交
154 155 156
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
157
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
158 159 160 161 162
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
163
  char reserve[SDB_RESERVE_SIZE] = {0};
164
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
165 166 167 168 169 170 171
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
172 173 174 175 176 177
int32_t sdbReadFile(SSdb *pSdb) {
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;

S
Shengliang Guan 已提交
178
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
179
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
180
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
181
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
182
    return -1;
S
Shengliang Guan 已提交
183 184 185
  }

  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
186
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
187
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
188

189 190
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
191
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
192 193
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
194
    return 0;
S
Shengliang Guan 已提交
195 196
  }

197
  if (sdbReadFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
198 199
    mError("failed to read file:%s head since %s", file, terrstr());
    pSdb->curVer = -1;
200
    pSdb->curTerm = -1;
wafwerar's avatar
wafwerar 已提交
201
    taosMemoryFree(pRaw);
202
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
203 204 205
    return -1;
  }

S
Shengliang Guan 已提交
206 207 208
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
209
  while (1) {
S
Shengliang Guan 已提交
210
    readLen = sizeof(SSdbRaw);
211
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
212 213 214 215 216 217 218 219
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
220
    if (ret != readLen) {
S
Shengliang Guan 已提交
221 222 223 224 225
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
226
    readLen = pRaw->dataLen + sizeof(int32_t);
227
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
228 229 230 231 232 233
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
234
    if (ret != readLen) {
S
Shengliang Guan 已提交
235 236 237 238 239
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
240
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
241
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
242
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
243 244 245 246
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

247
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
248
    if (code != 0) {
S
Shengliang Guan 已提交
249
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
250
      goto _OVER;
S
Shengliang Guan 已提交
251 252 253 254
    }
  }

  code = 0;
S
Shengliang Guan 已提交
255
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
256
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
S
Shengliang Guan 已提交
257
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
258

S
Shengliang Guan 已提交
259
_OVER:
260
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
261
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
262

S
Shengliang Guan 已提交
263
  terrno = code;
S
Shengliang Guan 已提交
264 265 266
  return code;
}

S
Shengliang Guan 已提交
267
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
268 269
  int32_t code = 0;

S
Shengliang Guan 已提交
270
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
271
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
272
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
273
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
274

275 276
  mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
         pSdb->curTerm, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
277

278
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
279
  if (pFile == NULL) {
S
Shengliang Guan 已提交
280 281 282
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
283 284
  }

285
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
286
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
287
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
288 289 290 291
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
292
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
293 294
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
295
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
296

S
Shengliang Guan 已提交
297
    SHashObj       *hash = pSdb->hashObjs[i];
298 299
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
300

S
Shengliang Guan 已提交
301 302 303
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
304 305 306 307 308 309 310
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
311 312 313
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
314

315
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
316

S
Shengliang Guan 已提交
317 318
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
319
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
320
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
321
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
322
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
323
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
324
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
325 326
          break;
        }
S
Shengliang Guan 已提交
327 328

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
329
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
330
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
331
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
332
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
333 334
          break;
        }
S
Shengliang Guan 已提交
335 336
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
337
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
338 339 340
        break;
      }

S
Shengliang Guan 已提交
341
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
342
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
343
    }
344
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
345 346 347
  }

  if (code == 0) {
348
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
349 350
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
wafwerar's avatar
wafwerar 已提交
351
      mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
352
    }
S
Shengliang Guan 已提交
353 354
  }

355
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
356

S
Shengliang Guan 已提交
357
  if (code == 0) {
S
Shengliang Guan 已提交
358
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
359 360 361 362
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
363 364 365
  }

  if (code != 0) {
S
Shengliang Guan 已提交
366
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
367
  } else {
S
Shengliang Guan 已提交
368
    pSdb->lastCommitVer = pSdb->curVer;
369
    mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
S
Shengliang Guan 已提交
370 371
  }

S
Shengliang Guan 已提交
372
  terrno = code;
S
Shengliang Guan 已提交
373 374 375
  return code;
}

S
Shengliang Guan 已提交
376 377 378 379 380 381 382 383
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

  return sdbWriteFileImp(pSdb);
}

S
Shengliang Guan 已提交
384
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
385 386
  if (sdbRunDeployFp(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
387 388
  }

S
Shengliang Guan 已提交
389
  if (sdbWriteFileImp(pSdb) != 0) {
S
Shengliang Guan 已提交
390
    return -1;
S
Shengliang Guan 已提交
391 392 393 394
  }

  return 0;
}
S
Shengliang Guan 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457

SSdbIter *sdbIterInit(SSdb *pSdb) {
  char datafile[PATH_MAX] = {0};
  char tmpfile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
  snprintf(tmpfile, sizeof(datafile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);

  if (taosCopyFile(datafile, tmpfile) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
    return NULL;
  }

  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
  if (pIter == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
  if (pIter->file == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read snapshot file:%s since %s", tmpfile, terrstr());
    taosMemoryFree(pIter);
    return NULL;
  }

  mDebug("start to read snapshot file:%s, iter:%p", tmpfile, pIter);
  return pIter;
}

SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *pIter, char **ppBuf, int32_t *buflen) {
  const int32_t maxlen = 100;

  char *pBuf = taosMemoryCalloc(1, maxlen);
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
  if (readlen == 0) {
    mTrace("read snapshot to the end, readlen:%" PRId64, pIter->readlen);
    taosMemoryFree(pBuf);
    taosCloseFile(&pIter->file);
    taosMemoryFree(pIter);
    pIter = NULL;
  } else if (readlen < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read snapshot since %s, readlen:%" PRId64, terrstr(), pIter->readlen);
    taosMemoryFree(pBuf);
    taosCloseFile(&pIter->file);
    taosMemoryFree(pIter);
    pIter = NULL;
  } else {
    pIter->readlen += readlen;
    mTrace("read snapshot, readlen:%" PRId64, pIter->readlen);
    *ppBuf = pBuf;
    *buflen = readlen;
  }

  return pIter;
}