sdbFile.c 18.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdb.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbDeployData(SSdb *pSdb) {
26
  mInfo("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

32
    mInfo("start to deploy sdb:%s", sdbTableName(i));
S
Shengliang Guan 已提交
33
    if ((*fp)(pSdb->pMnode) != 0) {
34
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
35
      return -1;
S
Shengliang Guan 已提交
36 37 38
    }
  }

39
  mInfo("sdb deploy success");
S
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43
static void sdbResetData(SSdb *pSdb) {
44
  mInfo("start to reset sdb");
S
Shengliang Guan 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL) continue;

      sdbFreeRow(pSdb, pRow, true);
      ppRow = taosHashIterate(hash, ppRow);
    }
  }

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    taosHashClear(pSdb->hashObjs[i]);
    pSdb->tableVer[i] = 0;
    pSdb->maxId[i] = 0;
67
    mInfo("sdb:%s is reset", sdbTableName(i));
S
Shengliang Guan 已提交
68 69
  }

S
Shengliang Guan 已提交
70 71 72 73 74 75
  pSdb->applyIndex = -1;
  pSdb->applyTerm = -1;
  pSdb->applyConfig = -1;
  pSdb->commitIndex = -1;
  pSdb->commitTerm = -1;
  pSdb->commitConfig = -1;
76
  mInfo("sdb reset success");
S
Shengliang Guan 已提交
77 78
}

79
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
95
  ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
S
Shengliang Guan 已提交
96 97 98 99 100 101 102 103 104
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
105
  ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
106 107 108 109 110 111 112 113 114
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
115
  ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
S
Shengliang Guan 已提交
116 117 118 119 120 121 122 123 124
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
125
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
126
    int64_t maxId = 0;
127
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
142
    int64_t ver = 0;
143
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
158
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
159 160 161 162 163 164 165 166 167 168 169 170
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

171
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
172 173 174 175 176 177
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
178
  if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
179 180 181 182
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
183
  if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
184 185 186 187
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
188
  if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
189 190 191 192
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
193
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
194
    int64_t maxId = 0;
S
Shengliang Guan 已提交
195 196 197
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
198
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
199 200 201 202 203 204
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
205
    int64_t ver = 0;
S
Shengliang Guan 已提交
206 207 208
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
209
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
210 211 212 213 214
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
215
  char reserve[SDB_RESERVE_SIZE] = {0};
216
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
217 218 219 220 221 222 223
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
224
static int32_t sdbReadFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
225 226 227 228
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;
S
Shengliang Guan 已提交
229 230 231
  char    file[PATH_MAX] = {0};

  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
232
  mInfo("start to read sdb file:%s", file);
S
Shengliang Guan 已提交
233

234
  SSdbRaw *pRaw = taosMemoryMalloc(TSDB_MAX_MSG_SIZE + 100);
S
Shengliang Guan 已提交
235
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
236
    terrno = TSDB_CODE_OUT_OF_MEMORY;
237
    mError("failed read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
238
    return -1;
S
Shengliang Guan 已提交
239 240
  }

241 242
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
243
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
244
    terrno = TAOS_SYSTEM_ERROR(errno);
245
    mDebug("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
246
    return 0;
S
Shengliang Guan 已提交
247 248
  }

249
  if (sdbReadFileHead(pSdb, pFile) != 0) {
250
    mError("failed to read sdb file:%s head since %s", file, terrstr());
wafwerar's avatar
wafwerar 已提交
251
    taosMemoryFree(pRaw);
252
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
253 254 255
    return -1;
  }

S
Shengliang Guan 已提交
256 257 258
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
259
  while (1) {
S
Shengliang Guan 已提交
260
    readLen = sizeof(SSdbRaw);
261
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
262 263 264 265
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
266
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
267 268 269
      break;
    }

S
Shengliang Guan 已提交
270
    if (ret != readLen) {
S
Shengliang Guan 已提交
271
      code = TSDB_CODE_FILE_CORRUPTED;
272
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
273 274 275
      break;
    }

S
Shengliang Guan 已提交
276
    readLen = pRaw->dataLen + sizeof(int32_t);
277
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
278 279
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
280
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
281 282 283
      break;
    }

S
Shengliang Guan 已提交
284
    if (ret != readLen) {
S
Shengliang Guan 已提交
285
      code = TSDB_CODE_FILE_CORRUPTED;
286
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
287 288 289
      break;
    }

S
Shengliang Guan 已提交
290
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
291
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
292
      code = TSDB_CODE_CHECKSUM_ERROR;
293
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
294 295 296
      break;
    }

297
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
298
    if (code != 0) {
299
      mError("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
300
      goto _OVER;
S
Shengliang Guan 已提交
301 302 303 304
    }
  }

  code = 0;
S
Shengliang Guan 已提交
305 306 307
  pSdb->commitIndex = pSdb->applyIndex;
  pSdb->commitTerm = pSdb->applyTerm;
  pSdb->commitConfig = pSdb->applyConfig;
S
Shengliang Guan 已提交
308
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
H
Hongze Cheng 已提交
309 310
  mInfo("read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->commitIndex,
        pSdb->commitTerm, pSdb->commitConfig);
S
Shengliang Guan 已提交
311

S
Shengliang Guan 已提交
312
_OVER:
313
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
314
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
315

S
Shengliang Guan 已提交
316
  terrno = code;
S
Shengliang Guan 已提交
317 318 319
  return code;
}

S
Shengliang Guan 已提交
320 321 322 323 324 325
int32_t sdbReadFile(SSdb *pSdb) {
  taosThreadMutexLock(&pSdb->filelock);

  sdbResetData(pSdb);
  int32_t code = sdbReadFileImp(pSdb);
  if (code != 0) {
326
    mError("failed to read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
327 328 329 330 331 332 333
    sdbResetData(pSdb);
  }

  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
}

S
Shengliang Guan 已提交
334
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
335 336
  int32_t code = 0;

S
Shengliang Guan 已提交
337
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
338
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
339
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
340
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
341

342
  mInfo("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
H
Hongze Cheng 已提交
343 344 345
        " term:%" PRId64 " config:%" PRId64 ", file:%s",
        pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
        curfile);
S
Shengliang Guan 已提交
346

347
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
348
  if (pFile == NULL) {
S
Shengliang Guan 已提交
349
    terrno = TAOS_SYSTEM_ERROR(errno);
350
    mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
S
Shengliang Guan 已提交
351
    return -1;
S
Shengliang Guan 已提交
352 353
  }

354
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
355
    mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
356
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
357 358 359 360
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
361
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
362 363
    if (encodeFp == NULL) continue;

364
    mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
365

S
Shengliang Guan 已提交
366
    SHashObj       *hash = pSdb->hashObjs[i];
367 368
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
369

S
Shengliang Guan 已提交
370 371 372
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
373 374 375 376 377 378 379
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
380 381 382
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
383

384
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
385

S
Shengliang Guan 已提交
386 387
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
388
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
389
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
390
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
391
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
392
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
393
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
394 395
          break;
        }
S
Shengliang Guan 已提交
396 397

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
398
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
399
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
400
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
401
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
402 403
          break;
        }
S
Shengliang Guan 已提交
404 405
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
406
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
407 408 409
        break;
      }

S
Shengliang Guan 已提交
410
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
411
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
412
    }
413
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
414 415 416
  }

  if (code == 0) {
417
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
418 419
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
420
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
421
    }
S
Shengliang Guan 已提交
422 423
  }

424
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
425

S
Shengliang Guan 已提交
426
  if (code == 0) {
S
Shengliang Guan 已提交
427
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
428 429
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
430
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
431
    }
S
Shengliang Guan 已提交
432 433 434
  }

  if (code != 0) {
435
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
436
  } else {
S
Shengliang Guan 已提交
437 438 439
    pSdb->commitIndex = pSdb->applyIndex;
    pSdb->commitTerm = pSdb->applyTerm;
    pSdb->commitConfig = pSdb->applyConfig;
440
    mInfo("write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
H
Hongze Cheng 已提交
441
          pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
S
Shengliang Guan 已提交
442 443
  }

S
Shengliang Guan 已提交
444
  terrno = code;
S
Shengliang Guan 已提交
445 446 447
  return code;
}

448
int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
449
  int32_t code = 0;
S
Shengliang Guan 已提交
450
  if (pSdb->applyIndex == pSdb->commitIndex) {
S
Shengliang Guan 已提交
451 452 453
    return 0;
  }

454 455 456 457
  if (pSdb->applyIndex - pSdb->commitIndex < delta) {
    return 0;
  }

S
Shengliang Guan 已提交
458
  taosThreadMutexLock(&pSdb->filelock);
459
  if (pSdb->pWal != NULL) {
S
Shengliang Guan 已提交
460
    code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
461 462 463 464 465 466 467 468 469
  }
  if (code == 0) {
    code = sdbWriteFileImp(pSdb);
  }
  if (code == 0) {
    if (pSdb->pWal != NULL) {
      code = walEndSnapshot(pSdb->pWal);
    }
  }
S
Shengliang Guan 已提交
470
  if (code != 0) {
471
    mError("failed to write sdb file since %s", terrstr());
S
Shengliang Guan 已提交
472 473 474
  }
  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
S
Shengliang Guan 已提交
475 476
}

S
Shengliang Guan 已提交
477
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
478
  if (sdbDeployData(pSdb) != 0) {
S
Shengliang Guan 已提交
479
    return -1;
S
Shengliang Guan 已提交
480 481
  }

482
  if (sdbWriteFile(pSdb, 0) != 0) {
S
Shengliang Guan 已提交
483
    return -1;
S
Shengliang Guan 已提交
484 485 486 487
  }

  return 0;
}
S
Shengliang Guan 已提交
488

S
Shengliang Guan 已提交
489
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
S
Shengliang Guan 已提交
490 491 492 493 494 495
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
  if (pIter == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
496 497 498 499
  char name[PATH_MAX + 100] = {0};
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
  pIter->name = strdup(name);
  if (pIter->name == NULL) {
S
Shengliang Guan 已提交
500
    taosMemoryFree(pIter);
S
Shengliang Guan 已提交
501
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
502 503 504 505 506 507
    return NULL;
  }

  return pIter;
}

S
Shengliang Guan 已提交
508
static void sdbCloseIter(SSdbIter *pIter) {
S
Shengliang Guan 已提交
509
  if (pIter == NULL) return;
S
Shengliang Guan 已提交
510

S
Shengliang Guan 已提交
511 512
  if (pIter->file != NULL) {
    taosCloseFile(&pIter->file);
S
Shengliang Guan 已提交
513
    pIter->file = NULL;
S
Shengliang Guan 已提交
514
  }
S
Shengliang Guan 已提交
515

S
Shengliang Guan 已提交
516
  if (pIter->name != NULL) {
S
Shengliang Guan 已提交
517
    (void)taosRemoveFile(pIter->name);
S
Shengliang Guan 已提交
518 519 520
    taosMemoryFree(pIter->name);
    pIter->name = NULL;
  }
S
Shengliang Guan 已提交
521

522
  mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
523 524 525
  taosMemoryFree(pIter);
}

526
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) {
S
Shengliang Guan 已提交
527 528
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
529

S
Shengliang Guan 已提交
530 531 532 533
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);

  taosThreadMutexLock(&pSdb->filelock);
S
Shengliang Guan 已提交
534 535 536
  int64_t commitIndex = pSdb->commitIndex;
  int64_t commitTerm = pSdb->commitTerm;
  int64_t commitConfig = pSdb->commitConfig;
S
Shengliang Guan 已提交
537 538 539
  if (taosCopyFile(datafile, pIter->name) < 0) {
    taosThreadMutexUnlock(&pSdb->filelock);
    terrno = TAOS_SYSTEM_ERROR(errno);
540
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
S
Shengliang Guan 已提交
541 542
    sdbCloseIter(pIter);
    return -1;
S
Shengliang Guan 已提交
543
  }
S
Shengliang Guan 已提交
544
  taosThreadMutexUnlock(&pSdb->filelock);
S
Shengliang Guan 已提交
545

S
Shengliang Guan 已提交
546 547 548
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
  if (pIter->file == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
549
    mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
550 551 552 553 554
    sdbCloseIter(pIter);
    return -1;
  }

  *ppIter = pIter;
555 556 557 558
  if (index != NULL) *index = commitIndex;
  if (term != NULL) *term = commitTerm;
  if (config != NULL) *config = commitConfig;

H
Hongze Cheng 已提交
559 560
  mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
        pIter, commitIndex, commitTerm, commitConfig, pIter->name);
S
Shengliang Guan 已提交
561
  return 0;
S
Shengliang Guan 已提交
562 563
}

S
Shengliang Guan 已提交
564 565 566 567
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter) {
  sdbCloseIter(pIter);
  return 0;
}
S
Shengliang Guan 已提交
568

S
Shengliang Guan 已提交
569
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
570
  int32_t maxlen = 4096;
S
Shengliang Guan 已提交
571
  void   *pBuf = taosMemoryCalloc(1, maxlen);
S
Shengliang Guan 已提交
572 573
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
574
    return -1;
S
Shengliang Guan 已提交
575 576 577
  }

  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
S
Shengliang Guan 已提交
578
  if (readlen < 0 || readlen > maxlen) {
S
Shengliang Guan 已提交
579
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
580 581 582
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
    *ppBuf = NULL;
    *len = 0;
S
Shengliang Guan 已提交
583
    taosMemoryFree(pBuf);
S
Shengliang Guan 已提交
584 585
    return -1;
  } else if (readlen == 0) {
586
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
587 588 589 590
    *ppBuf = NULL;
    *len = 0;
    taosMemoryFree(pBuf);
    return 0;
S
Shengliang Guan 已提交
591
  } else {  // (readlen <= maxlen)
S
Shengliang Guan 已提交
592
    pIter->total += readlen;
593
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
S
Shengliang Guan 已提交
594 595 596
    *ppBuf = pBuf;
    *len = readlen;
    return 0;
S
Shengliang Guan 已提交
597 598
  }
}
S
Shengliang Guan 已提交
599

S
Shengliang Guan 已提交
600 601 602
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
603

S
Shengliang Guan 已提交
604 605
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pIter->file == NULL) {
S
Shengliang Guan 已提交
606
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
607
    mError("failed to open %s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
608
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
609 610 611
    return -1;
  }

S
Shengliang Guan 已提交
612
  *ppIter = pIter;
613
  mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
S
Shengliang Guan 已提交
614 615
  return 0;
}
S
Shengliang Guan 已提交
616

617
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, int64_t term, int64_t config) {
S
Shengliang Guan 已提交
618 619 620
  int32_t code = 0;

  if (!isApply) {
621
    mInfo("sdbiter:%p, not apply to sdb", pIter);
S
Shengliang Guan 已提交
622
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
623
    return 0;
S
Shengliang Guan 已提交
624 625
  }

S
Shengliang Guan 已提交
626 627 628
  taosFsyncFile(pIter->file);
  taosCloseFile(&pIter->file);
  pIter->file = NULL;
S
Shengliang Guan 已提交
629

S
Shengliang Guan 已提交
630 631 632
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
  if (taosRenameFile(pIter->name, datafile) != 0) {
S
Shengliang Guan 已提交
633
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
634 635
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
636 637 638 639
    return -1;
  }

  if (sdbReadFile(pSdb) != 0) {
S
Shengliang Guan 已提交
640
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
S
Shengliang Guan 已提交
641
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
642 643 644
    return -1;
  }

645 646 647 648 649 650 651 652 653 654 655 656 657
  if (config > 0) {
    ASSERT(pSdb->commitConfig == config);
    pSdb->commitConfig = config;
  }
  if (term > 0) {
    ASSERT(pSdb->commitTerm == term);
    pSdb->commitTerm = term;
  }
  if (index > 0) {
    ASSERT(pSdb->commitIndex == index);
    pSdb->commitIndex = index;
  }

658
  mInfo("sdbiter:%p, success applyed to sdb", pIter);
S
Shengliang Guan 已提交
659
  sdbCloseIter(pIter);
S
Shengliang Guan 已提交
660 661 662 663 664 665 666 667
  return 0;
}

int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
  if (writelen != len) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total);
S
Shengliang Guan 已提交
668 669 670
    return -1;
  }

S
Shengliang Guan 已提交
671
  pIter->total += writelen;
672
  mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
S
Shengliang Guan 已提交
673
  return 0;
674
}