sdbFile.c 16.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdb.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbDeployData(SSdb *pSdb) {
S
Shengliang Guan 已提交
26
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
    mDebug("start to deploy sdb:%s", sdbTableName(i));
S
Shengliang Guan 已提交
33
    if ((*fp)(pSdb->pMnode) != 0) {
34
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
35
      return -1;
S
Shengliang Guan 已提交
36 37 38
    }
  }

S
Shengliang Guan 已提交
39
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
static void sdbResetData(SSdb *pSdb) {
  mDebug("start to reset sdb");

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL) continue;

      sdbFreeRow(pSdb, pRow, true);
      ppRow = taosHashIterate(hash, ppRow);
    }
  }

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    taosHashClear(pSdb->hashObjs[i]);
    pSdb->tableVer[i] = 0;
    pSdb->maxId[i] = 0;
    mDebug("sdb:%s is reset", sdbTableName(i));
  }

  pSdb->curVer = -1;
  pSdb->curTerm = -1;
  pSdb->lastCommitVer = -1;
  mDebug("sdb reset successfully");
}

76
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
93 94 95 96 97 98 99 100 101
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

102 103 104 105 106 107 108 109 110 111
  ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
112
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
113
    int64_t maxId = 0;
114
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
129
    int64_t ver = 0;
130
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
145
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
146 147 148 149 150 151 152 153 154 155 156 157
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

158
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
159 160 161 162 163 164
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

165
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
166 167 168 169
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

170 171 172 173 174
  if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
175
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
176
    int64_t maxId = 0;
S
Shengliang Guan 已提交
177 178 179
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
180
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
181 182 183 184 185 186
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
187
    int64_t ver = 0;
S
Shengliang Guan 已提交
188 189 190
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
191
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
192 193 194 195 196
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
197
  char reserve[SDB_RESERVE_SIZE] = {0};
198
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
199 200 201 202 203 204 205
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
206
static int32_t sdbReadFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
207 208 209 210
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;
S
Shengliang Guan 已提交
211 212 213 214
  char    file[PATH_MAX] = {0};

  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
215

S
Shengliang Guan 已提交
216
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
217
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
218
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
219
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
220
    return -1;
S
Shengliang Guan 已提交
221 222
  }

223 224
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
225
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
226 227
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
228
    return 0;
S
Shengliang Guan 已提交
229 230
  }

231
  if (sdbReadFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
232
    mError("failed to read file:%s head since %s", file, terrstr());
wafwerar's avatar
wafwerar 已提交
233
    taosMemoryFree(pRaw);
234
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
235 236 237
    return -1;
  }

S
Shengliang Guan 已提交
238 239 240
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
241
  while (1) {
S
Shengliang Guan 已提交
242
    readLen = sizeof(SSdbRaw);
243
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
244 245 246 247 248 249 250 251
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
252
    if (ret != readLen) {
S
Shengliang Guan 已提交
253 254 255 256 257
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
258
    readLen = pRaw->dataLen + sizeof(int32_t);
259
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
260 261 262 263 264 265
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
266
    if (ret != readLen) {
S
Shengliang Guan 已提交
267 268 269 270 271
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
272
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
273
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
274
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
275 276 277 278
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

279
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
280
    if (code != 0) {
S
Shengliang Guan 已提交
281
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
282
      goto _OVER;
S
Shengliang Guan 已提交
283 284 285 286
    }
  }

  code = 0;
S
Shengliang Guan 已提交
287
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
288
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
S
Shengliang Guan 已提交
289
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
290

S
Shengliang Guan 已提交
291
_OVER:
292
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
293
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
294

S
Shengliang Guan 已提交
295
  terrno = code;
S
Shengliang Guan 已提交
296 297 298
  return code;
}

S
Shengliang Guan 已提交
299 300 301 302 303 304 305 306 307 308 309 310 311 312
int32_t sdbReadFile(SSdb *pSdb) {
  taosThreadMutexLock(&pSdb->filelock);

  sdbResetData(pSdb);
  int32_t code = sdbReadFileImp(pSdb);
  if (code != 0) {
    mError("failed to read sdb since %s", terrstr());
    sdbResetData(pSdb);
  }

  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
}

S
Shengliang Guan 已提交
313
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
314 315
  int32_t code = 0;

S
Shengliang Guan 已提交
316
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
317
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
318
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
319
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
320

321 322
  mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
         pSdb->curTerm, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
323

324
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
325
  if (pFile == NULL) {
S
Shengliang Guan 已提交
326 327 328
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
329 330
  }

331
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
332
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
333
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
334 335 336 337
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
338
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
339 340
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
341
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
342

S
Shengliang Guan 已提交
343
    SHashObj       *hash = pSdb->hashObjs[i];
344 345
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
346

S
Shengliang Guan 已提交
347 348 349
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
350 351 352 353 354 355 356
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
357 358 359
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
360

361
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
362

S
Shengliang Guan 已提交
363 364
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
365
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
366
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
367
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
368
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
369
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
370
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
371 372
          break;
        }
S
Shengliang Guan 已提交
373 374

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
375
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
376
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
377
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
378
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
379 380
          break;
        }
S
Shengliang Guan 已提交
381 382
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
383
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
384 385 386
        break;
      }

S
Shengliang Guan 已提交
387
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
388
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
389
    }
390
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
391 392 393
  }

  if (code == 0) {
394
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
395 396
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
wafwerar's avatar
wafwerar 已提交
397
      mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
398
    }
S
Shengliang Guan 已提交
399 400
  }

401
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
402

S
Shengliang Guan 已提交
403
  if (code == 0) {
S
Shengliang Guan 已提交
404
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
405 406 407 408
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
409 410 411
  }

  if (code != 0) {
S
Shengliang Guan 已提交
412
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
413
  } else {
S
Shengliang Guan 已提交
414
    pSdb->lastCommitVer = pSdb->curVer;
415
    mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
S
Shengliang Guan 已提交
416 417
  }

S
Shengliang Guan 已提交
418
  terrno = code;
S
Shengliang Guan 已提交
419 420 421
  return code;
}

S
Shengliang Guan 已提交
422 423 424 425 426
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

S
Shengliang Guan 已提交
427 428 429 430 431 432 433
  taosThreadMutexLock(&pSdb->filelock);
  int32_t code = sdbWriteFileImp(pSdb);
  if (code != 0) {
    mError("failed to write sdb since %s", terrstr());
  }
  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
S
Shengliang Guan 已提交
434 435
}

S
Shengliang Guan 已提交
436
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
437
  if (sdbDeployData(pSdb) != 0) {
S
Shengliang Guan 已提交
438
    return -1;
S
Shengliang Guan 已提交
439 440
  }

S
Shengliang Guan 已提交
441
  if (sdbWriteFile(pSdb) != 0) {
S
Shengliang Guan 已提交
442
    return -1;
S
Shengliang Guan 已提交
443 444 445 446
  }

  return 0;
}
S
Shengliang Guan 已提交
447

S
Shengliang Guan 已提交
448
static SSdbIter *sdbOpenIter(SSdb *pSdb) {
S
Shengliang Guan 已提交
449 450 451
  char datafile[PATH_MAX] = {0};
  char tmpfile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
452
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
453

S
Shengliang Guan 已提交
454
  taosThreadMutexLock(&pSdb->filelock);
S
Shengliang Guan 已提交
455
  if (taosCopyFile(datafile, tmpfile) != 0) {
S
Shengliang Guan 已提交
456
    taosThreadMutexUnlock(&pSdb->filelock);
S
Shengliang Guan 已提交
457 458 459 460
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to copy file %s to %s since %s", datafile, tmpfile, terrstr());
    return NULL;
  }
S
Shengliang Guan 已提交
461
  taosThreadMutexUnlock(&pSdb->filelock);
S
Shengliang Guan 已提交
462 463 464 465 466 467 468 469 470 471

  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
  if (pIter == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  pIter->file = taosOpenFile(tmpfile, TD_FILE_READ);
  if (pIter->file == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
472
    mError("failed to read file:%s since %s", tmpfile, terrstr());
S
Shengliang Guan 已提交
473 474 475 476 477 478 479
    taosMemoryFree(pIter);
    return NULL;
  }

  return pIter;
}

S
Shengliang Guan 已提交
480
static void sdbCloseIter(SSdb *pSdb, SSdbIter *pIter) {
S
Shengliang Guan 已提交
481 482 483 484
  if (pIter == NULL) return;
  if (pIter->file != NULL) {
    taosCloseFile(&pIter->file);
  }
S
Shengliang Guan 已提交
485 486 487 488 489

  char tmpfile[PATH_MAX] = {0};
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
  taosRemoveFile(tmpfile);

S
Shengliang Guan 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
  taosMemoryFree(pIter);
  mInfo("sdbiter:%p, is closed", pIter);
}

static SSdbIter *sdbGetIter(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = NULL;
  if (ppIter != NULL) pIter = *ppIter;

  if (pIter == NULL) {
    pIter = sdbOpenIter(pSdb);
    if (pIter != NULL) {
      mInfo("sdbiter:%p, is created to read snapshot", pIter);
      *ppIter = pIter;
    } else {
      mError("failed to create sdbiter to read snapshot since %s", terrstr());
      *ppIter = NULL;
      return NULL;
    }
  } else {
    mInfo("sdbiter:%p, continue to read snapshot, total:%" PRId64, pIter, pIter->total);
  }

  return pIter;
}

int32_t sdbReadSnapshot(SSdb *pSdb, SSdbIter **ppIter, char **ppBuf, int32_t *len) {
  SSdbIter *pIter = sdbGetIter(pSdb, ppIter);
  if (pIter == NULL) return -1;

S
Shengliang Guan 已提交
519 520
  int32_t maxlen = 100;
  char   *pBuf = taosMemoryCalloc(1, maxlen);
S
Shengliang Guan 已提交
521 522
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
523
    sdbCloseIter(pSdb, pIter);
S
Shengliang Guan 已提交
524
    return -1;
S
Shengliang Guan 已提交
525 526 527
  }

  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
S
Shengliang Guan 已提交
528
  if (readlen < 0 || (readlen == 0 && errno != 0)) {
S
Shengliang Guan 已提交
529
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
530 531 532 533
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
    *ppBuf = NULL;
    *len = 0;
    *ppIter = NULL;
S
Shengliang Guan 已提交
534
    sdbCloseIter(pSdb, pIter);
S
Shengliang Guan 已提交
535
    taosMemoryFree(pBuf);
S
Shengliang Guan 已提交
536 537 538 539 540 541
    return -1;
  } else if (readlen == 0) {
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
    *ppBuf = NULL;
    *len = 0;
    *ppIter = NULL;
S
Shengliang Guan 已提交
542
    sdbCloseIter(pSdb, pIter);
S
Shengliang Guan 已提交
543 544 545 546 547 548 549 550 551 552
    taosMemoryFree(pBuf);
    return 0;
  } else if ((readlen < maxlen && errno != 0) || readlen == maxlen) {
    pIter->total += readlen;
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
    *ppBuf = pBuf;
    *len = readlen;
    return 0;
  } else if (readlen < maxlen && errno == 0) {
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
553
    *ppBuf = pBuf;
S
Shengliang Guan 已提交
554 555
    *len = readlen;
    *ppIter = NULL;
S
Shengliang Guan 已提交
556
    sdbCloseIter(pSdb, pIter);
S
Shengliang Guan 已提交
557 558 559 560 561 562 563
    return 0;
  } else {
    // impossible
    mError("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
    *ppBuf = NULL;
    *len = 0;
    *ppIter = NULL;
S
Shengliang Guan 已提交
564
    sdbCloseIter(pSdb, pIter);
S
Shengliang Guan 已提交
565 566
    taosMemoryFree(pBuf);
    return -1;
S
Shengliang Guan 已提交
567 568
  }
}
S
Shengliang Guan 已提交
569 570 571 572 573

int32_t sdbApplySnapshot(SSdb *pSdb, char *pBuf, int32_t len) {
  char datafile[PATH_MAX] = {0};
  char tmpfile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
574
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612

  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to write %s since %s", tmpfile, terrstr());
    return -1;
  }

  int32_t writelen = taosWriteFile(pFile, pBuf, len);
  if (writelen != len) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to write %s since %s", tmpfile, terrstr());
    taosCloseFile(&pFile);
    return -1;
  }

  if (taosFsyncFile(pFile) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to fsync %s since %s", tmpfile, terrstr());
    taosCloseFile(&pFile);
    return -1;
  }

  (void)taosCloseFile(&pFile);

  if (taosRenameFile(tmpfile, datafile) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to rename file %s to %s since %s", tmpfile, datafile, terrstr());
    return -1;
  }

  if (sdbReadFile(pSdb) != 0) {
    mError("failed to read from %s since %s", datafile, terrstr());
    return -1;
  }

  return 0;
}