sdbFile.c 17.5 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdb.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbDeployData(SSdb *pSdb) {
S
Shengliang Guan 已提交
26
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
    mDebug("start to deploy sdb:%s", sdbTableName(i));
S
Shengliang Guan 已提交
33
    if ((*fp)(pSdb->pMnode) != 0) {
34
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
35
      return -1;
S
Shengliang Guan 已提交
36 37 38
    }
  }

S
Shengliang Guan 已提交
39
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
static void sdbResetData(SSdb *pSdb) {
  mDebug("start to reset sdb");

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL) continue;

      sdbFreeRow(pSdb, pRow, true);
      ppRow = taosHashIterate(hash, ppRow);
    }
  }

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    taosHashClear(pSdb->hashObjs[i]);
    pSdb->tableVer[i] = 0;
    pSdb->maxId[i] = 0;
    mDebug("sdb:%s is reset", sdbTableName(i));
  }

S
Shengliang Guan 已提交
70 71 72 73 74 75
  pSdb->applyIndex = -1;
  pSdb->applyTerm = -1;
  pSdb->applyConfig = -1;
  pSdb->commitIndex = -1;
  pSdb->commitTerm = -1;
  pSdb->commitConfig = -1;
S
Shengliang Guan 已提交
76 77 78
  mDebug("sdb reset successfully");
}

79
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
95
  ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
S
Shengliang Guan 已提交
96 97 98 99 100 101 102 103 104
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
105
  ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
106 107 108 109 110 111 112 113 114
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
115
  ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
S
Shengliang Guan 已提交
116 117 118 119 120 121 122 123 124
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
125
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
126
    int64_t maxId = 0;
127
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
142
    int64_t ver = 0;
143
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
158
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
159 160 161 162 163 164 165 166 167 168 169 170
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

171
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
172 173 174 175 176 177
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
178
  if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
179 180 181 182
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
183
  if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
184 185 186 187
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
188
  if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
189 190 191 192
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
193
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
194
    int64_t maxId = 0;
S
Shengliang Guan 已提交
195 196 197
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
198
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
199 200 201 202 203 204
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
205
    int64_t ver = 0;
S
Shengliang Guan 已提交
206 207 208
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
209
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
210 211 212 213 214
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
215
  char reserve[SDB_RESERVE_SIZE] = {0};
216
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
217 218 219 220 221 222 223
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
224
static int32_t sdbReadFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
225 226 227 228
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;
S
Shengliang Guan 已提交
229 230 231
  char    file[PATH_MAX] = {0};

  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
232
  mDebug("start to read sdb file:%s", file);
S
Shengliang Guan 已提交
233

S
Shengliang Guan 已提交
234
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
235
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
236
    terrno = TSDB_CODE_OUT_OF_MEMORY;
237
    mError("failed read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
238
    return -1;
S
Shengliang Guan 已提交
239 240
  }

241 242
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
243
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
244
    terrno = TAOS_SYSTEM_ERROR(errno);
245
    mDebug("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
246
    return 0;
S
Shengliang Guan 已提交
247 248
  }

249
  if (sdbReadFileHead(pSdb, pFile) != 0) {
250
    mError("failed to read sdb file:%s head since %s", file, terrstr());
wafwerar's avatar
wafwerar 已提交
251
    taosMemoryFree(pRaw);
252
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
253 254 255
    return -1;
  }

S
Shengliang Guan 已提交
256 257 258
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
259
  while (1) {
S
Shengliang Guan 已提交
260
    readLen = sizeof(SSdbRaw);
261
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
262 263 264 265
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
266
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
267 268 269
      break;
    }

S
Shengliang Guan 已提交
270
    if (ret != readLen) {
S
Shengliang Guan 已提交
271
      code = TSDB_CODE_FILE_CORRUPTED;
272
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
273 274 275
      break;
    }

S
Shengliang Guan 已提交
276
    readLen = pRaw->dataLen + sizeof(int32_t);
277
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
278 279
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
280
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
281 282 283
      break;
    }

S
Shengliang Guan 已提交
284
    if (ret != readLen) {
S
Shengliang Guan 已提交
285
      code = TSDB_CODE_FILE_CORRUPTED;
286
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
287 288 289
      break;
    }

S
Shengliang Guan 已提交
290
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
291
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
292
      code = TSDB_CODE_CHECKSUM_ERROR;
293
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
294 295 296
      break;
    }

297
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
298
    if (code != 0) {
299
      mError("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
300
      goto _OVER;
S
Shengliang Guan 已提交
301 302 303 304
    }
  }

  code = 0;
S
Shengliang Guan 已提交
305 306 307
  pSdb->commitIndex = pSdb->applyIndex;
  pSdb->commitTerm = pSdb->applyTerm;
  pSdb->commitConfig = pSdb->applyConfig;
S
Shengliang Guan 已提交
308
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
S
Shengliang Guan 已提交
309 310
  mDebug("read sdb file:%s successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
         pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
S
Shengliang Guan 已提交
311

S
Shengliang Guan 已提交
312
_OVER:
313
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
314
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
315

S
Shengliang Guan 已提交
316
  terrno = code;
S
Shengliang Guan 已提交
317 318 319
  return code;
}

S
Shengliang Guan 已提交
320 321 322 323 324 325
int32_t sdbReadFile(SSdb *pSdb) {
  taosThreadMutexLock(&pSdb->filelock);

  sdbResetData(pSdb);
  int32_t code = sdbReadFileImp(pSdb);
  if (code != 0) {
326
    mError("failed to read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
327 328 329 330 331 332 333
    sdbResetData(pSdb);
  }

  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
}

S
Shengliang Guan 已提交
334
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
335 336
  int32_t code = 0;

S
Shengliang Guan 已提交
337
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
338
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
339
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
340
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
341

S
Shengliang Guan 已提交
342 343 344 345
  mDebug("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
         " term:%" PRId64 " config:%" PRId64 ", file:%s",
         pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
         curfile);
S
Shengliang Guan 已提交
346

347
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
348
  if (pFile == NULL) {
S
Shengliang Guan 已提交
349
    terrno = TAOS_SYSTEM_ERROR(errno);
350
    mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
S
Shengliang Guan 已提交
351
    return -1;
S
Shengliang Guan 已提交
352 353
  }

354
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
355
    mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
356
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
357 358 359 360
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
361
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
362 363
    if (encodeFp == NULL) continue;

364
    mDebug("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
365

S
Shengliang Guan 已提交
366
    SHashObj       *hash = pSdb->hashObjs[i];
367 368
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
369

S
Shengliang Guan 已提交
370 371 372
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
373 374 375 376 377 378 379
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
380 381 382
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
383

384
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
385

S
Shengliang Guan 已提交
386 387
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
388
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
389
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
390
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
391
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
392
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
393
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
394 395
          break;
        }
S
Shengliang Guan 已提交
396 397

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
398
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
399
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
400
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
401
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
402 403
          break;
        }
S
Shengliang Guan 已提交
404 405
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
406
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
407 408 409
        break;
      }

S
Shengliang Guan 已提交
410
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
411
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
412
    }
413
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
414 415 416
  }

  if (code == 0) {
417
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
418 419
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
420
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
421
    }
S
Shengliang Guan 已提交
422 423
  }

424
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
425

S
Shengliang Guan 已提交
426
  if (code == 0) {
S
Shengliang Guan 已提交
427
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
428 429
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
430
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
431
    }
S
Shengliang Guan 已提交
432 433 434
  }

  if (code != 0) {
435
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
436
  } else {
S
Shengliang Guan 已提交
437 438 439 440 441
    pSdb->commitIndex = pSdb->applyIndex;
    pSdb->commitTerm = pSdb->applyTerm;
    pSdb->commitConfig = pSdb->applyConfig;
    mDebug("write sdb file successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
           pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
S
Shengliang Guan 已提交
442 443
  }

S
Shengliang Guan 已提交
444
  terrno = code;
S
Shengliang Guan 已提交
445 446 447
  return code;
}

S
Shengliang Guan 已提交
448
int32_t sdbWriteFile(SSdb *pSdb) {
449
  int32_t code = 0;
S
Shengliang Guan 已提交
450
  if (pSdb->applyIndex == pSdb->commitIndex) {
S
Shengliang Guan 已提交
451 452 453
    return 0;
  }

S
Shengliang Guan 已提交
454
  taosThreadMutexLock(&pSdb->filelock);
455
  if (pSdb->pWal != NULL) {
S
Shengliang Guan 已提交
456
    code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
457 458 459 460 461 462 463 464 465
  }
  if (code == 0) {
    code = sdbWriteFileImp(pSdb);
  }
  if (code == 0) {
    if (pSdb->pWal != NULL) {
      code = walEndSnapshot(pSdb->pWal);
    }
  }
S
Shengliang Guan 已提交
466
  if (code != 0) {
467
    mError("failed to write sdb file since %s", terrstr());
S
Shengliang Guan 已提交
468 469 470
  }
  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
S
Shengliang Guan 已提交
471 472
}

S
Shengliang Guan 已提交
473
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
474
  if (sdbDeployData(pSdb) != 0) {
S
Shengliang Guan 已提交
475
    return -1;
S
Shengliang Guan 已提交
476 477
  }

S
Shengliang Guan 已提交
478
  if (sdbWriteFile(pSdb) != 0) {
S
Shengliang Guan 已提交
479
    return -1;
S
Shengliang Guan 已提交
480 481 482 483
  }

  return 0;
}
S
Shengliang Guan 已提交
484

S
Shengliang Guan 已提交
485
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
S
Shengliang Guan 已提交
486 487 488 489 490 491
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
  if (pIter == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
492 493 494 495
  char name[PATH_MAX + 100] = {0};
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
  pIter->name = strdup(name);
  if (pIter->name == NULL) {
S
Shengliang Guan 已提交
496
    taosMemoryFree(pIter);
S
Shengliang Guan 已提交
497
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
498 499 500 501 502 503
    return NULL;
  }

  return pIter;
}

S
Shengliang Guan 已提交
504
static void sdbCloseIter(SSdbIter *pIter) {
S
Shengliang Guan 已提交
505
  if (pIter == NULL) return;
S
Shengliang Guan 已提交
506

S
Shengliang Guan 已提交
507 508
  if (pIter->file != NULL) {
    taosCloseFile(&pIter->file);
S
Shengliang Guan 已提交
509
    pIter->file = NULL;
S
Shengliang Guan 已提交
510
  }
S
Shengliang Guan 已提交
511

S
Shengliang Guan 已提交
512 513 514 515 516
  if (pIter->name != NULL) {
    taosRemoveFile(pIter->name);
    taosMemoryFree(pIter->name);
    pIter->name = NULL;
  }
S
Shengliang Guan 已提交
517

S
Shengliang Guan 已提交
518
  mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
519 520 521
  taosMemoryFree(pIter);
}

S
Shengliang Guan 已提交
522 523 524
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
525

S
Shengliang Guan 已提交
526 527 528 529
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);

  taosThreadMutexLock(&pSdb->filelock);
S
Shengliang Guan 已提交
530 531 532
  int64_t commitIndex = pSdb->commitIndex;
  int64_t commitTerm = pSdb->commitTerm;
  int64_t commitConfig = pSdb->commitConfig;
S
Shengliang Guan 已提交
533 534 535
  if (taosCopyFile(datafile, pIter->name) < 0) {
    taosThreadMutexUnlock(&pSdb->filelock);
    terrno = TAOS_SYSTEM_ERROR(errno);
536
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
S
Shengliang Guan 已提交
537 538
    sdbCloseIter(pIter);
    return -1;
S
Shengliang Guan 已提交
539
  }
S
Shengliang Guan 已提交
540
  taosThreadMutexUnlock(&pSdb->filelock);
S
Shengliang Guan 已提交
541

S
Shengliang Guan 已提交
542 543 544
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
  if (pIter->file == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
545
    mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
546 547 548 549 550
    sdbCloseIter(pIter);
    return -1;
  }

  *ppIter = pIter;
S
Shengliang Guan 已提交
551 552
  mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
        pIter, commitIndex, commitTerm, commitConfig, pIter->name);
S
Shengliang Guan 已提交
553
  return 0;
S
Shengliang Guan 已提交
554 555
}

S
Shengliang Guan 已提交
556 557 558 559
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter) {
  sdbCloseIter(pIter);
  return 0;
}
S
Shengliang Guan 已提交
560

S
Shengliang Guan 已提交
561
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
S
Shengliang Guan 已提交
562
  int32_t maxlen = 100;
S
Shengliang Guan 已提交
563
  void   *pBuf = taosMemoryCalloc(1, maxlen);
S
Shengliang Guan 已提交
564 565
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
566
    return -1;
S
Shengliang Guan 已提交
567 568 569
  }

  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
S
Shengliang Guan 已提交
570
  if (readlen < 0 || readlen > maxlen) {
S
Shengliang Guan 已提交
571
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
572 573 574
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
    *ppBuf = NULL;
    *len = 0;
S
Shengliang Guan 已提交
575
    taosMemoryFree(pBuf);
S
Shengliang Guan 已提交
576 577 578 579 580 581 582
    return -1;
  } else if (readlen == 0) {
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
    *ppBuf = NULL;
    *len = 0;
    taosMemoryFree(pBuf);
    return 0;
S
Shengliang Guan 已提交
583
  } else {  // (readlen <= maxlen)
S
Shengliang Guan 已提交
584 585 586 587 588
    pIter->total += readlen;
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
    *ppBuf = pBuf;
    *len = readlen;
    return 0;
S
Shengliang Guan 已提交
589 590
  }
}
S
Shengliang Guan 已提交
591

S
Shengliang Guan 已提交
592 593 594
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
595

S
Shengliang Guan 已提交
596 597
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pIter->file == NULL) {
S
Shengliang Guan 已提交
598
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
599
    mError("failed to open %s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
600 601 602
    return -1;
  }

S
Shengliang Guan 已提交
603 604 605 606
  *ppIter = pIter;
  mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
  return 0;
}
S
Shengliang Guan 已提交
607

S
Shengliang Guan 已提交
608 609 610 611 612 613 614
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply) {
  int32_t code = 0;

  if (!isApply) {
    sdbCloseIter(pIter);
    mInfo("sdbiter:%p, not apply to sdb", pIter);
    return 0;
S
Shengliang Guan 已提交
615 616
  }

S
Shengliang Guan 已提交
617 618 619
  taosFsyncFile(pIter->file);
  taosCloseFile(&pIter->file);
  pIter->file = NULL;
S
Shengliang Guan 已提交
620

S
Shengliang Guan 已提交
621 622 623
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
  if (taosRenameFile(pIter->name, datafile) != 0) {
S
Shengliang Guan 已提交
624
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
625 626
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
627 628 629
    return -1;
  }

S
Shengliang Guan 已提交
630
  sdbCloseIter(pIter);
S
Shengliang Guan 已提交
631
  if (sdbReadFile(pSdb) != 0) {
S
Shengliang Guan 已提交
632 633 634 635 636 637 638 639 640 641 642 643 644
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
    return -1;
  }

  mInfo("sdbiter:%p, successfully applyed to sdb", pIter);
  return 0;
}

int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
  if (writelen != len) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total);
S
Shengliang Guan 已提交
645 646 647
    return -1;
  }

S
Shengliang Guan 已提交
648 649
  pIter->total += writelen;
  mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
S
Shengliang Guan 已提交
650 651
  return 0;
}