sdbFile.c 17.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdb.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbDeployData(SSdb *pSdb) {
S
Shengliang Guan 已提交
26
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
    mDebug("start to deploy sdb:%s", sdbTableName(i));
S
Shengliang Guan 已提交
33
    if ((*fp)(pSdb->pMnode) != 0) {
34
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
35
      return -1;
S
Shengliang Guan 已提交
36 37 38
    }
  }

S
Shengliang Guan 已提交
39
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
static void sdbResetData(SSdb *pSdb) {
  mDebug("start to reset sdb");

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL) continue;

      sdbFreeRow(pSdb, pRow, true);
      ppRow = taosHashIterate(hash, ppRow);
    }
  }

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    taosHashClear(pSdb->hashObjs[i]);
    pSdb->tableVer[i] = 0;
    pSdb->maxId[i] = 0;
    mDebug("sdb:%s is reset", sdbTableName(i));
  }

S
Shengliang Guan 已提交
70 71 72 73 74 75
  pSdb->applyIndex = -1;
  pSdb->applyTerm = -1;
  pSdb->applyConfig = -1;
  pSdb->commitIndex = -1;
  pSdb->commitTerm = -1;
  pSdb->commitConfig = -1;
S
Shengliang Guan 已提交
76 77 78
  mDebug("sdb reset successfully");
}

79
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
95
  ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
S
Shengliang Guan 已提交
96 97 98 99 100 101 102 103 104
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
105
  ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
106 107 108 109 110 111 112 113 114
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
115
  ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
S
Shengliang Guan 已提交
116 117 118 119 120 121 122 123 124
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
125
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
126
    int64_t maxId = 0;
127
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
142
    int64_t ver = 0;
143
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
158
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
159 160 161 162 163 164 165 166 167 168 169 170
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

171
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
172 173 174 175 176 177
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
178
  if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
179 180 181 182
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
183
  if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
184 185 186 187
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
188
  if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
189 190 191 192
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
193
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
194
    int64_t maxId = 0;
S
Shengliang Guan 已提交
195 196 197
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
198
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
199 200 201 202 203 204
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
205
    int64_t ver = 0;
S
Shengliang Guan 已提交
206 207 208
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
209
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
210 211 212 213 214
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
215
  char reserve[SDB_RESERVE_SIZE] = {0};
216
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
217 218 219 220 221 222 223
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
224
static int32_t sdbReadFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
225 226 227 228
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;
S
Shengliang Guan 已提交
229 230 231
  char    file[PATH_MAX] = {0};

  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
232
  mDebug("start to read sdb file:%s", file);
S
Shengliang Guan 已提交
233

S
Shengliang Guan 已提交
234
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
235
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
236
    terrno = TSDB_CODE_OUT_OF_MEMORY;
237
    mError("failed read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
238
    return -1;
S
Shengliang Guan 已提交
239 240
  }

241 242
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
243
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
244
    terrno = TAOS_SYSTEM_ERROR(errno);
245
    mDebug("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
246
    return 0;
S
Shengliang Guan 已提交
247 248
  }

249
  if (sdbReadFileHead(pSdb, pFile) != 0) {
250
    mError("failed to read sdb file:%s head since %s", file, terrstr());
wafwerar's avatar
wafwerar 已提交
251
    taosMemoryFree(pRaw);
252
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
253 254 255
    return -1;
  }

S
Shengliang Guan 已提交
256 257 258
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
259
  while (1) {
S
Shengliang Guan 已提交
260
    readLen = sizeof(SSdbRaw);
261
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
262 263 264 265
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
266
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
267 268 269
      break;
    }

S
Shengliang Guan 已提交
270
    if (ret != readLen) {
S
Shengliang Guan 已提交
271
      code = TSDB_CODE_FILE_CORRUPTED;
272
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
273 274 275
      break;
    }

S
Shengliang Guan 已提交
276
    readLen = pRaw->dataLen + sizeof(int32_t);
277
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
278 279
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
280
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
281 282 283
      break;
    }

S
Shengliang Guan 已提交
284
    if (ret != readLen) {
S
Shengliang Guan 已提交
285
      code = TSDB_CODE_FILE_CORRUPTED;
286
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
287 288 289
      break;
    }

S
Shengliang Guan 已提交
290
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
291
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
292
      code = TSDB_CODE_CHECKSUM_ERROR;
293
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
294 295 296
      break;
    }

297
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
298
    if (code != 0) {
299
      mError("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
300
      goto _OVER;
S
Shengliang Guan 已提交
301 302 303 304
    }
  }

  code = 0;
S
Shengliang Guan 已提交
305 306 307
  pSdb->commitIndex = pSdb->applyIndex;
  pSdb->commitTerm = pSdb->applyTerm;
  pSdb->commitConfig = pSdb->applyConfig;
S
Shengliang Guan 已提交
308
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
S
Shengliang Guan 已提交
309 310
  mDebug("read sdb file:%s successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
         pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
S
Shengliang Guan 已提交
311

S
Shengliang Guan 已提交
312
_OVER:
313
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
314
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
315

S
Shengliang Guan 已提交
316
  terrno = code;
S
Shengliang Guan 已提交
317 318 319
  return code;
}

S
Shengliang Guan 已提交
320 321 322 323 324 325
int32_t sdbReadFile(SSdb *pSdb) {
  taosThreadMutexLock(&pSdb->filelock);

  sdbResetData(pSdb);
  int32_t code = sdbReadFileImp(pSdb);
  if (code != 0) {
326
    mError("failed to read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
327 328 329 330 331 332 333
    sdbResetData(pSdb);
  }

  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
}

S
Shengliang Guan 已提交
334
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
335 336
  int32_t code = 0;

S
Shengliang Guan 已提交
337
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
338
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
339
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
340
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
341

S
Shengliang Guan 已提交
342 343 344 345
  mDebug("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
         " term:%" PRId64 " config:%" PRId64 ", file:%s",
         pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
         curfile);
S
Shengliang Guan 已提交
346

347
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
348
  if (pFile == NULL) {
S
Shengliang Guan 已提交
349
    terrno = TAOS_SYSTEM_ERROR(errno);
350
    mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
S
Shengliang Guan 已提交
351
    return -1;
S
Shengliang Guan 已提交
352 353
  }

354
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
355
    mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
356
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
357 358 359 360
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
361
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
362 363
    if (encodeFp == NULL) continue;

364
    mDebug("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
365

S
Shengliang Guan 已提交
366
    SHashObj       *hash = pSdb->hashObjs[i];
367 368
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
369

S
Shengliang Guan 已提交
370 371 372
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
373 374 375 376 377 378 379
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
380 381 382
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
383

384
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
385

S
Shengliang Guan 已提交
386 387
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
388
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
389
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
390
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
391
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
392
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
393
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
394 395
          break;
        }
S
Shengliang Guan 已提交
396 397

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
398
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
399
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
400
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
401
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
402 403
          break;
        }
S
Shengliang Guan 已提交
404 405
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
406
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
407 408 409
        break;
      }

S
Shengliang Guan 已提交
410
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
411
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
412
    }
413
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
414 415 416
  }

  if (code == 0) {
417
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
418 419
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
420
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
421
    }
S
Shengliang Guan 已提交
422 423
  }

424
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
425

S
Shengliang Guan 已提交
426
  if (code == 0) {
S
Shengliang Guan 已提交
427
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
428 429
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
430
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
431
    }
S
Shengliang Guan 已提交
432 433 434
  }

  if (code != 0) {
435
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
436
  } else {
S
Shengliang Guan 已提交
437 438 439 440 441
    pSdb->commitIndex = pSdb->applyIndex;
    pSdb->commitTerm = pSdb->applyTerm;
    pSdb->commitConfig = pSdb->applyConfig;
    mDebug("write sdb file successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
           pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
S
Shengliang Guan 已提交
442 443
  }

S
Shengliang Guan 已提交
444
  terrno = code;
S
Shengliang Guan 已提交
445 446 447
  return code;
}

448
int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
449
  int32_t code = 0;
S
Shengliang Guan 已提交
450
  if (pSdb->applyIndex == pSdb->commitIndex) {
S
Shengliang Guan 已提交
451 452 453
    return 0;
  }

454 455 456 457
  if (pSdb->applyIndex - pSdb->commitIndex < delta) {
    return 0;
  }

S
Shengliang Guan 已提交
458
  taosThreadMutexLock(&pSdb->filelock);
459
  if (pSdb->pWal != NULL) {
S
Shengliang Guan 已提交
460
    code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
461 462 463 464 465 466 467 468 469
  }
  if (code == 0) {
    code = sdbWriteFileImp(pSdb);
  }
  if (code == 0) {
    if (pSdb->pWal != NULL) {
      code = walEndSnapshot(pSdb->pWal);
    }
  }
S
Shengliang Guan 已提交
470
  if (code != 0) {
471
    mError("failed to write sdb file since %s", terrstr());
S
Shengliang Guan 已提交
472 473 474
  }
  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
S
Shengliang Guan 已提交
475 476
}

S
Shengliang Guan 已提交
477
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
478
  if (sdbDeployData(pSdb) != 0) {
S
Shengliang Guan 已提交
479
    return -1;
S
Shengliang Guan 已提交
480 481
  }

482
  if (sdbWriteFile(pSdb, 0) != 0) {
S
Shengliang Guan 已提交
483
    return -1;
S
Shengliang Guan 已提交
484 485 486 487
  }

  return 0;
}
S
Shengliang Guan 已提交
488

S
Shengliang Guan 已提交
489
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
S
Shengliang Guan 已提交
490 491 492 493 494 495
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
  if (pIter == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
496 497 498 499
  char name[PATH_MAX + 100] = {0};
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
  pIter->name = strdup(name);
  if (pIter->name == NULL) {
S
Shengliang Guan 已提交
500
    taosMemoryFree(pIter);
S
Shengliang Guan 已提交
501
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
502 503 504 505 506 507
    return NULL;
  }

  return pIter;
}

S
Shengliang Guan 已提交
508
static void sdbCloseIter(SSdbIter *pIter) {
S
Shengliang Guan 已提交
509
  if (pIter == NULL) return;
S
Shengliang Guan 已提交
510

S
Shengliang Guan 已提交
511 512
  if (pIter->file != NULL) {
    taosCloseFile(&pIter->file);
S
Shengliang Guan 已提交
513
    pIter->file = NULL;
S
Shengliang Guan 已提交
514
  }
S
Shengliang Guan 已提交
515

S
Shengliang Guan 已提交
516 517 518 519 520
  if (pIter->name != NULL) {
    taosRemoveFile(pIter->name);
    taosMemoryFree(pIter->name);
    pIter->name = NULL;
  }
S
Shengliang Guan 已提交
521

S
Shengliang Guan 已提交
522
  mDebug("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
523 524 525
  taosMemoryFree(pIter);
}

526
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) {
S
Shengliang Guan 已提交
527 528
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
529

S
Shengliang Guan 已提交
530 531 532 533
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);

  taosThreadMutexLock(&pSdb->filelock);
S
Shengliang Guan 已提交
534 535 536
  int64_t commitIndex = pSdb->commitIndex;
  int64_t commitTerm = pSdb->commitTerm;
  int64_t commitConfig = pSdb->commitConfig;
S
Shengliang Guan 已提交
537 538 539
  if (taosCopyFile(datafile, pIter->name) < 0) {
    taosThreadMutexUnlock(&pSdb->filelock);
    terrno = TAOS_SYSTEM_ERROR(errno);
540
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
S
Shengliang Guan 已提交
541 542
    sdbCloseIter(pIter);
    return -1;
S
Shengliang Guan 已提交
543
  }
S
Shengliang Guan 已提交
544
  taosThreadMutexUnlock(&pSdb->filelock);
S
Shengliang Guan 已提交
545

S
Shengliang Guan 已提交
546 547 548
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
  if (pIter->file == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
549
    mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
550 551 552 553 554
    sdbCloseIter(pIter);
    return -1;
  }

  *ppIter = pIter;
555 556 557 558
  if (index != NULL) *index = commitIndex;
  if (term != NULL) *term = commitTerm;
  if (config != NULL) *config = commitConfig;

S
Shengliang Guan 已提交
559
  mDebug("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
S
Shengliang Guan 已提交
560
        pIter, commitIndex, commitTerm, commitConfig, pIter->name);
S
Shengliang Guan 已提交
561
  return 0;
S
Shengliang Guan 已提交
562 563
}

S
Shengliang Guan 已提交
564 565 566 567
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter) {
  sdbCloseIter(pIter);
  return 0;
}
S
Shengliang Guan 已提交
568

S
Shengliang Guan 已提交
569
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
S
Shengliang Guan 已提交
570
  int32_t maxlen = 100;
S
Shengliang Guan 已提交
571
  void   *pBuf = taosMemoryCalloc(1, maxlen);
S
Shengliang Guan 已提交
572 573
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
574
    return -1;
S
Shengliang Guan 已提交
575 576 577
  }

  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
S
Shengliang Guan 已提交
578
  if (readlen < 0 || readlen > maxlen) {
S
Shengliang Guan 已提交
579
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
580 581 582
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
    *ppBuf = NULL;
    *len = 0;
S
Shengliang Guan 已提交
583
    taosMemoryFree(pBuf);
S
Shengliang Guan 已提交
584 585
    return -1;
  } else if (readlen == 0) {
S
Shengliang Guan 已提交
586
    mDebug("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
587 588 589 590
    *ppBuf = NULL;
    *len = 0;
    taosMemoryFree(pBuf);
    return 0;
S
Shengliang Guan 已提交
591
  } else {  // (readlen <= maxlen)
S
Shengliang Guan 已提交
592
    pIter->total += readlen;
S
Shengliang Guan 已提交
593
    mDebug("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
S
Shengliang Guan 已提交
594 595 596
    *ppBuf = pBuf;
    *len = readlen;
    return 0;
S
Shengliang Guan 已提交
597 598
  }
}
S
Shengliang Guan 已提交
599

S
Shengliang Guan 已提交
600 601 602
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
603

S
Shengliang Guan 已提交
604 605
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pIter->file == NULL) {
S
Shengliang Guan 已提交
606
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
607
    mError("failed to open %s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
608 609 610
    return -1;
  }

S
Shengliang Guan 已提交
611
  *ppIter = pIter;
S
Shengliang Guan 已提交
612
  mDebug("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
S
Shengliang Guan 已提交
613 614
  return 0;
}
S
Shengliang Guan 已提交
615

S
Shengliang Guan 已提交
616 617 618 619 620
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply) {
  int32_t code = 0;

  if (!isApply) {
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
621
    mDebug("sdbiter:%p, not apply to sdb", pIter);
S
Shengliang Guan 已提交
622
    return 0;
S
Shengliang Guan 已提交
623 624
  }

S
Shengliang Guan 已提交
625 626 627
  taosFsyncFile(pIter->file);
  taosCloseFile(&pIter->file);
  pIter->file = NULL;
S
Shengliang Guan 已提交
628

S
Shengliang Guan 已提交
629 630 631
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
  if (taosRenameFile(pIter->name, datafile) != 0) {
S
Shengliang Guan 已提交
632
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
633 634
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
635 636 637
    return -1;
  }

S
Shengliang Guan 已提交
638
  sdbCloseIter(pIter);
S
Shengliang Guan 已提交
639
  if (sdbReadFile(pSdb) != 0) {
S
Shengliang Guan 已提交
640 641 642 643
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
644
  mDebug("sdbiter:%p, successfully applyed to sdb", pIter);
S
Shengliang Guan 已提交
645 646 647 648 649 650 651 652
  return 0;
}

int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
  if (writelen != len) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total);
S
Shengliang Guan 已提交
653 654 655
    return -1;
  }

S
Shengliang Guan 已提交
656
  pIter->total += writelen;
S
Shengliang Guan 已提交
657
  mDebug("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
S
Shengliang Guan 已提交
658 659
  return 0;
}