sdbFile.c 16.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdb.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbDeployData(SSdb *pSdb) {
S
Shengliang Guan 已提交
26
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
    mDebug("start to deploy sdb:%s", sdbTableName(i));
S
Shengliang Guan 已提交
33
    if ((*fp)(pSdb->pMnode) != 0) {
34
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
35
      return -1;
S
Shengliang Guan 已提交
36 37 38
    }
  }

S
Shengliang Guan 已提交
39
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
static void sdbResetData(SSdb *pSdb) {
  mDebug("start to reset sdb");

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL) continue;

      sdbFreeRow(pSdb, pRow, true);
      ppRow = taosHashIterate(hash, ppRow);
    }
  }

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    taosHashClear(pSdb->hashObjs[i]);
    pSdb->tableVer[i] = 0;
    pSdb->maxId[i] = 0;
    mDebug("sdb:%s is reset", sdbTableName(i));
  }

  pSdb->curVer = -1;
  pSdb->curTerm = -1;
  pSdb->lastCommitVer = -1;
73
  pSdb->lastCommitTerm = -1;
S
Shengliang Guan 已提交
74 75 76
  mDebug("sdb reset successfully");
}

77
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
94 95 96 97 98 99 100 101 102
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

103 104 105 106 107 108 109 110 111 112
  ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
113
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
114
    int64_t maxId = 0;
115
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
130
    int64_t ver = 0;
131
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
146
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
147 148 149 150 151 152 153 154 155 156 157 158
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

159
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
160 161 162 163 164 165
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

166
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
167 168 169 170
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

171 172 173 174 175
  if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
176
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
177
    int64_t maxId = 0;
S
Shengliang Guan 已提交
178 179 180
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
181
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
182 183 184 185 186 187
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
188
    int64_t ver = 0;
S
Shengliang Guan 已提交
189 190 191
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
192
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
193 194 195 196 197
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
198
  char reserve[SDB_RESERVE_SIZE] = {0};
199
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
200 201 202 203 204 205 206
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
207
static int32_t sdbReadFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
208 209 210 211
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;
S
Shengliang Guan 已提交
212 213 214
  char    file[PATH_MAX] = {0};

  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
215
  mDebug("start to read sdb file:%s", file);
S
Shengliang Guan 已提交
216

S
Shengliang Guan 已提交
217
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
218
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
219
    terrno = TSDB_CODE_OUT_OF_MEMORY;
220
    mError("failed read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
221
    return -1;
S
Shengliang Guan 已提交
222 223
  }

224 225
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
226
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
227
    terrno = TAOS_SYSTEM_ERROR(errno);
228
    mError("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
229
    return 0;
S
Shengliang Guan 已提交
230 231
  }

232
  if (sdbReadFileHead(pSdb, pFile) != 0) {
233
    mError("failed to read sdb file:%s head since %s", file, terrstr());
wafwerar's avatar
wafwerar 已提交
234
    taosMemoryFree(pRaw);
235
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
236 237 238
    return -1;
  }

S
Shengliang Guan 已提交
239 240 241
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
242
  while (1) {
S
Shengliang Guan 已提交
243
    readLen = sizeof(SSdbRaw);
244
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
245 246 247 248
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
249
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
250 251 252
      break;
    }

S
Shengliang Guan 已提交
253
    if (ret != readLen) {
S
Shengliang Guan 已提交
254
      code = TSDB_CODE_FILE_CORRUPTED;
255
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
256 257 258
      break;
    }

S
Shengliang Guan 已提交
259
    readLen = pRaw->dataLen + sizeof(int32_t);
260
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
261 262
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
263
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
264 265 266
      break;
    }

S
Shengliang Guan 已提交
267
    if (ret != readLen) {
S
Shengliang Guan 已提交
268
      code = TSDB_CODE_FILE_CORRUPTED;
269
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
270 271 272
      break;
    }

S
Shengliang Guan 已提交
273
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
274
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
275
      code = TSDB_CODE_CHECKSUM_ERROR;
276
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
277 278 279
      break;
    }

280
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
281
    if (code != 0) {
282
      mError("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
283
      goto _OVER;
S
Shengliang Guan 已提交
284 285 286 287
    }
  }

  code = 0;
S
Shengliang Guan 已提交
288
  pSdb->lastCommitVer = pSdb->curVer;
289
  pSdb->lastCommitTerm = pSdb->curTerm;
S
Shengliang Guan 已提交
290
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
291 292
  mDebug("read sdb file:%s successfully, ver:%" PRId64 " term:%" PRId64, file, pSdb->lastCommitVer,
         pSdb->lastCommitTerm);
S
Shengliang Guan 已提交
293

S
Shengliang Guan 已提交
294
_OVER:
295
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
296
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
297

S
Shengliang Guan 已提交
298
  terrno = code;
S
Shengliang Guan 已提交
299 300 301
  return code;
}

S
Shengliang Guan 已提交
302 303 304 305 306 307
int32_t sdbReadFile(SSdb *pSdb) {
  taosThreadMutexLock(&pSdb->filelock);

  sdbResetData(pSdb);
  int32_t code = sdbReadFileImp(pSdb);
  if (code != 0) {
308
    mError("failed to read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
309 310 311 312 313 314 315
    sdbResetData(pSdb);
  }

  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
}

S
Shengliang Guan 已提交
316
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
317 318
  int32_t code = 0;

S
Shengliang Guan 已提交
319
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
320
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
321
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
322
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
323

324 325 326
  mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64
         " file:%s",
         pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile);
S
Shengliang Guan 已提交
327

328
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
329
  if (pFile == NULL) {
S
Shengliang Guan 已提交
330
    terrno = TAOS_SYSTEM_ERROR(errno);
331
    mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
S
Shengliang Guan 已提交
332
    return -1;
S
Shengliang Guan 已提交
333 334
  }

335
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
336
    mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
337
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
338 339 340 341
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
342
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
343 344
    if (encodeFp == NULL) continue;

345
    mTrace("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
346

S
Shengliang Guan 已提交
347
    SHashObj       *hash = pSdb->hashObjs[i];
348 349
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
350

S
Shengliang Guan 已提交
351 352 353
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
354 355 356 357 358 359 360
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
361 362 363
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
364

365
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
366

S
Shengliang Guan 已提交
367 368
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
369
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
370
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
371
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
372
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
373
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
374
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
375 376
          break;
        }
S
Shengliang Guan 已提交
377 378

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
379
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
380
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
381
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
382
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
383 384
          break;
        }
S
Shengliang Guan 已提交
385 386
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
387
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
388 389 390
        break;
      }

S
Shengliang Guan 已提交
391
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
392
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
393
    }
394
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
395 396 397
  }

  if (code == 0) {
398
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
399 400
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
401
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
402
    }
S
Shengliang Guan 已提交
403 404
  }

405
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
406

S
Shengliang Guan 已提交
407
  if (code == 0) {
S
Shengliang Guan 已提交
408
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
409 410
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
411
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
412
    }
S
Shengliang Guan 已提交
413 414 415
  }

  if (code != 0) {
416
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
417
  } else {
S
Shengliang Guan 已提交
418
    pSdb->lastCommitVer = pSdb->curVer;
419 420 421
    pSdb->lastCommitTerm = pSdb->curTerm;
    mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer,
           pSdb->lastCommitTerm, curfile);
S
Shengliang Guan 已提交
422 423
  }

S
Shengliang Guan 已提交
424
  terrno = code;
S
Shengliang Guan 已提交
425 426 427
  return code;
}

S
Shengliang Guan 已提交
428 429 430 431 432
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

S
Shengliang Guan 已提交
433 434 435
  taosThreadMutexLock(&pSdb->filelock);
  int32_t code = sdbWriteFileImp(pSdb);
  if (code != 0) {
436
    mError("failed to write sdb file since %s", terrstr());
S
Shengliang Guan 已提交
437 438 439
  }
  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
S
Shengliang Guan 已提交
440 441
}

S
Shengliang Guan 已提交
442
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
443
  if (sdbDeployData(pSdb) != 0) {
S
Shengliang Guan 已提交
444
    return -1;
S
Shengliang Guan 已提交
445 446
  }

S
Shengliang Guan 已提交
447
  if (sdbWriteFile(pSdb) != 0) {
S
Shengliang Guan 已提交
448
    return -1;
S
Shengliang Guan 已提交
449 450 451 452
  }

  return 0;
}
S
Shengliang Guan 已提交
453

S
Shengliang Guan 已提交
454
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
S
Shengliang Guan 已提交
455 456 457 458 459 460
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
  if (pIter == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
461 462 463 464
  char name[PATH_MAX + 100] = {0};
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
  pIter->name = strdup(name);
  if (pIter->name == NULL) {
S
Shengliang Guan 已提交
465
    taosMemoryFree(pIter);
S
Shengliang Guan 已提交
466
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
467 468 469 470 471 472
    return NULL;
  }

  return pIter;
}

S
Shengliang Guan 已提交
473
static void sdbCloseIter(SSdbIter *pIter) {
S
Shengliang Guan 已提交
474
  if (pIter == NULL) return;
S
Shengliang Guan 已提交
475

S
Shengliang Guan 已提交
476 477
  if (pIter->file != NULL) {
    taosCloseFile(&pIter->file);
S
Shengliang Guan 已提交
478
    pIter->file = NULL;
S
Shengliang Guan 已提交
479
  }
S
Shengliang Guan 已提交
480

S
Shengliang Guan 已提交
481 482 483 484 485
  if (pIter->name != NULL) {
    taosRemoveFile(pIter->name);
    taosMemoryFree(pIter->name);
    pIter->name = NULL;
  }
S
Shengliang Guan 已提交
486

S
Shengliang Guan 已提交
487
  mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
488 489 490
  taosMemoryFree(pIter);
}

S
Shengliang Guan 已提交
491 492 493
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
494

S
Shengliang Guan 已提交
495 496 497 498
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);

  taosThreadMutexLock(&pSdb->filelock);
S
Shengliang Guan 已提交
499 500
  int64_t commitIndex = pSdb->lastCommitVer;
  int64_t commitTerm = pSdb->lastCommitTerm;
S
Shengliang Guan 已提交
501 502 503
  if (taosCopyFile(datafile, pIter->name) < 0) {
    taosThreadMutexUnlock(&pSdb->filelock);
    terrno = TAOS_SYSTEM_ERROR(errno);
504
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
S
Shengliang Guan 已提交
505 506
    sdbCloseIter(pIter);
    return -1;
S
Shengliang Guan 已提交
507
  }
S
Shengliang Guan 已提交
508
  taosThreadMutexUnlock(&pSdb->filelock);
S
Shengliang Guan 已提交
509

S
Shengliang Guan 已提交
510 511 512
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
  if (pIter->file == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
513
    mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
514 515 516 517 518
    sdbCloseIter(pIter);
    return -1;
  }

  *ppIter = pIter;
S
Shengliang Guan 已提交
519 520
  mInfo("sdbiter:%p, is created to read snapshot, index:%" PRId64 " term:%" PRId64 " file:%s", pIter, commitIndex,
        commitTerm, pIter->name);
S
Shengliang Guan 已提交
521
  return 0;
S
Shengliang Guan 已提交
522 523
}

S
Shengliang Guan 已提交
524 525 526 527
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter) {
  sdbCloseIter(pIter);
  return 0;
}
S
Shengliang Guan 已提交
528

S
Shengliang Guan 已提交
529
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
S
Shengliang Guan 已提交
530
  int32_t maxlen = 100;
S
Shengliang Guan 已提交
531
  void   *pBuf = taosMemoryCalloc(1, maxlen);
S
Shengliang Guan 已提交
532 533
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
534
    return -1;
S
Shengliang Guan 已提交
535 536 537
  }

  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
S
Shengliang Guan 已提交
538
  if (readlen < 0 || readlen > maxlen) {
S
Shengliang Guan 已提交
539
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
540 541 542
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
    *ppBuf = NULL;
    *len = 0;
S
Shengliang Guan 已提交
543
    taosMemoryFree(pBuf);
S
Shengliang Guan 已提交
544 545 546 547 548 549 550
    return -1;
  } else if (readlen == 0) {
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
    *ppBuf = NULL;
    *len = 0;
    taosMemoryFree(pBuf);
    return 0;
S
Shengliang Guan 已提交
551
  } else {  // (readlen <= maxlen)
S
Shengliang Guan 已提交
552 553 554 555 556
    pIter->total += readlen;
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
    *ppBuf = pBuf;
    *len = readlen;
    return 0;
S
Shengliang Guan 已提交
557 558
  }
}
S
Shengliang Guan 已提交
559

S
Shengliang Guan 已提交
560 561 562
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
563

S
Shengliang Guan 已提交
564 565
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pIter->file == NULL) {
S
Shengliang Guan 已提交
566
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
567
    mError("failed to open %s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
568 569 570
    return -1;
  }

S
Shengliang Guan 已提交
571 572 573 574
  *ppIter = pIter;
  mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
  return 0;
}
S
Shengliang Guan 已提交
575

S
Shengliang Guan 已提交
576 577 578 579 580 581 582
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply) {
  int32_t code = 0;

  if (!isApply) {
    sdbCloseIter(pIter);
    mInfo("sdbiter:%p, not apply to sdb", pIter);
    return 0;
S
Shengliang Guan 已提交
583 584
  }

S
Shengliang Guan 已提交
585 586 587
  taosFsyncFile(pIter->file);
  taosCloseFile(&pIter->file);
  pIter->file = NULL;
S
Shengliang Guan 已提交
588

S
Shengliang Guan 已提交
589 590 591
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
  if (taosRenameFile(pIter->name, datafile) != 0) {
S
Shengliang Guan 已提交
592
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
593 594
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
595 596 597
    return -1;
  }

S
Shengliang Guan 已提交
598
  sdbCloseIter(pIter);
S
Shengliang Guan 已提交
599
  if (sdbReadFile(pSdb) != 0) {
S
Shengliang Guan 已提交
600 601 602 603 604 605 606 607 608 609 610 611 612
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
    return -1;
  }

  mInfo("sdbiter:%p, successfully applyed to sdb", pIter);
  return 0;
}

int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
  if (writelen != len) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total);
S
Shengliang Guan 已提交
613 614 615
    return -1;
  }

S
Shengliang Guan 已提交
616 617
  pIter->total += writelen;
  mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
S
Shengliang Guan 已提交
618 619
  return 0;
}