sdbFile.c 17.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdb.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbDeployData(SSdb *pSdb) {
S
Shengliang Guan 已提交
26
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
    mDebug("start to deploy sdb:%s", sdbTableName(i));
S
Shengliang Guan 已提交
33
    if ((*fp)(pSdb->pMnode) != 0) {
34
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
35
      return -1;
S
Shengliang Guan 已提交
36 37 38
    }
  }

S
Shengliang Guan 已提交
39
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
40 41 42
  return 0;
}

S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
static void sdbResetData(SSdb *pSdb) {
  mDebug("start to reset sdb");

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL) continue;

      sdbFreeRow(pSdb, pRow, true);
      ppRow = taosHashIterate(hash, ppRow);
    }
  }

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    taosHashClear(pSdb->hashObjs[i]);
    pSdb->tableVer[i] = 0;
    pSdb->maxId[i] = 0;
    mDebug("sdb:%s is reset", sdbTableName(i));
  }

  pSdb->curVer = -1;
  pSdb->curTerm = -1;
  pSdb->lastCommitVer = -1;
73
  pSdb->lastCommitTerm = -1;
S
Shengliang Guan 已提交
74 75 76
  mDebug("sdb reset successfully");
}

77
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
94 95 96 97 98 99 100 101 102
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

103 104 105 106 107 108 109 110 111 112
  ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
113 114 115 116 117 118 119 120 121 122
  ret = taosReadFile(pFile, &pSdb->curConfig, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
123
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
124
    int64_t maxId = 0;
125
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
126 127 128 129 130 131 132 133 134 135 136 137 138 139
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
140
    int64_t ver = 0;
141
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
156
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
157 158 159 160 161 162 163 164 165 166 167 168
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

169
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
170 171 172 173 174 175
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

176
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
177 178 179 180
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

181 182 183 184 185
  if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
186 187 188 189 190
  if (taosWriteFile(pFile, &pSdb->curConfig, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
191
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
192
    int64_t maxId = 0;
S
Shengliang Guan 已提交
193 194 195
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
196
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
197 198 199 200 201 202
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
203
    int64_t ver = 0;
S
Shengliang Guan 已提交
204 205 206
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
207
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
208 209 210 211 212
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
213
  char reserve[SDB_RESERVE_SIZE] = {0};
214
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
215 216 217 218 219 220 221
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
222
static int32_t sdbReadFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
223 224 225 226
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;
S
Shengliang Guan 已提交
227 228 229
  char    file[PATH_MAX] = {0};

  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
230
  mDebug("start to read sdb file:%s", file);
S
Shengliang Guan 已提交
231

S
Shengliang Guan 已提交
232
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
233
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
234
    terrno = TSDB_CODE_OUT_OF_MEMORY;
235
    mError("failed read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
236
    return -1;
S
Shengliang Guan 已提交
237 238
  }

239 240
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
241
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
242
    terrno = TAOS_SYSTEM_ERROR(errno);
243
    mError("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
244
    return 0;
S
Shengliang Guan 已提交
245 246
  }

247
  if (sdbReadFileHead(pSdb, pFile) != 0) {
248
    mError("failed to read sdb file:%s head since %s", file, terrstr());
wafwerar's avatar
wafwerar 已提交
249
    taosMemoryFree(pRaw);
250
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
251 252 253
    return -1;
  }

S
Shengliang Guan 已提交
254 255 256
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
257
  while (1) {
S
Shengliang Guan 已提交
258
    readLen = sizeof(SSdbRaw);
259
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
260 261 262 263
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
264
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
265 266 267
      break;
    }

S
Shengliang Guan 已提交
268
    if (ret != readLen) {
S
Shengliang Guan 已提交
269
      code = TSDB_CODE_FILE_CORRUPTED;
270
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
271 272 273
      break;
    }

S
Shengliang Guan 已提交
274
    readLen = pRaw->dataLen + sizeof(int32_t);
275
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
276 277
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
278
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
279 280 281
      break;
    }

S
Shengliang Guan 已提交
282
    if (ret != readLen) {
S
Shengliang Guan 已提交
283
      code = TSDB_CODE_FILE_CORRUPTED;
284
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
285 286 287
      break;
    }

S
Shengliang Guan 已提交
288
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
289
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
290
      code = TSDB_CODE_CHECKSUM_ERROR;
291
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
292 293 294
      break;
    }

295
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
296
    if (code != 0) {
297
      mError("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
298
      goto _OVER;
S
Shengliang Guan 已提交
299 300 301 302
    }
  }

  code = 0;
S
Shengliang Guan 已提交
303
  pSdb->lastCommitVer = pSdb->curVer;
304
  pSdb->lastCommitTerm = pSdb->curTerm;
S
Shengliang Guan 已提交
305
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
S
Shengliang Guan 已提交
306 307
  mDebug("read sdb file:%s successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->lastCommitVer,
         pSdb->lastCommitTerm, pSdb->curConfig);
S
Shengliang Guan 已提交
308

S
Shengliang Guan 已提交
309
_OVER:
310
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
311
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
312

S
Shengliang Guan 已提交
313
  terrno = code;
S
Shengliang Guan 已提交
314 315 316
  return code;
}

S
Shengliang Guan 已提交
317 318 319 320 321 322
int32_t sdbReadFile(SSdb *pSdb) {
  taosThreadMutexLock(&pSdb->filelock);

  sdbResetData(pSdb);
  int32_t code = sdbReadFileImp(pSdb);
  if (code != 0) {
323
    mError("failed to read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
324 325 326 327 328 329 330
    sdbResetData(pSdb);
  }

  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
}

S
Shengliang Guan 已提交
331
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
332 333
  int32_t code = 0;

S
Shengliang Guan 已提交
334
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
335
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
336
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
337
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
338

339 340 341
  mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64
         " file:%s",
         pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile);
S
Shengliang Guan 已提交
342

343
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
344
  if (pFile == NULL) {
S
Shengliang Guan 已提交
345
    terrno = TAOS_SYSTEM_ERROR(errno);
346
    mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
S
Shengliang Guan 已提交
347
    return -1;
S
Shengliang Guan 已提交
348 349
  }

350
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
351
    mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
352
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
353 354 355 356
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
357
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
358 359
    if (encodeFp == NULL) continue;

360
    mTrace("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
361

S
Shengliang Guan 已提交
362
    SHashObj       *hash = pSdb->hashObjs[i];
363 364
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
365

S
Shengliang Guan 已提交
366 367 368
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
369 370 371 372 373 374 375
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
376 377 378
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
379

380
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
381

S
Shengliang Guan 已提交
382 383
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
384
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
385
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
386
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
387
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
388
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
389
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
390 391
          break;
        }
S
Shengliang Guan 已提交
392 393

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
394
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
395
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
396
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
397
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
398 399
          break;
        }
S
Shengliang Guan 已提交
400 401
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
402
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
403 404 405
        break;
      }

S
Shengliang Guan 已提交
406
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
407
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
408
    }
409
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
410 411 412
  }

  if (code == 0) {
413
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
414 415
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
416
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
417
    }
S
Shengliang Guan 已提交
418 419
  }

420
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
421

S
Shengliang Guan 已提交
422
  if (code == 0) {
S
Shengliang Guan 已提交
423
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
424 425
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
426
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
427
    }
S
Shengliang Guan 已提交
428 429 430
  }

  if (code != 0) {
431
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
432
  } else {
S
Shengliang Guan 已提交
433
    pSdb->lastCommitVer = pSdb->curVer;
434 435 436
    pSdb->lastCommitTerm = pSdb->curTerm;
    mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer,
           pSdb->lastCommitTerm, curfile);
S
Shengliang Guan 已提交
437 438
  }

S
Shengliang Guan 已提交
439
  terrno = code;
S
Shengliang Guan 已提交
440 441 442
  return code;
}

S
Shengliang Guan 已提交
443
int32_t sdbWriteFile(SSdb *pSdb) {
444
  int32_t code = 0;
S
Shengliang Guan 已提交
445 446 447 448
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

S
Shengliang Guan 已提交
449
  taosThreadMutexLock(&pSdb->filelock);
450 451 452 453 454 455 456 457 458 459 460
  if (pSdb->pWal != NULL) {
    code = walBeginSnapshot(pSdb->pWal, pSdb->curVer);
  }
  if (code == 0) {
    code = sdbWriteFileImp(pSdb);
  }
  if (code == 0) {
    if (pSdb->pWal != NULL) {
      code = walEndSnapshot(pSdb->pWal);
    }
  }
S
Shengliang Guan 已提交
461
  if (code != 0) {
462
    mError("failed to write sdb file since %s", terrstr());
S
Shengliang Guan 已提交
463 464 465
  }
  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
S
Shengliang Guan 已提交
466 467
}

S
Shengliang Guan 已提交
468
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
469
  if (sdbDeployData(pSdb) != 0) {
S
Shengliang Guan 已提交
470
    return -1;
S
Shengliang Guan 已提交
471 472
  }

S
Shengliang Guan 已提交
473
  if (sdbWriteFile(pSdb) != 0) {
S
Shengliang Guan 已提交
474
    return -1;
S
Shengliang Guan 已提交
475 476 477 478
  }

  return 0;
}
S
Shengliang Guan 已提交
479

S
Shengliang Guan 已提交
480
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
S
Shengliang Guan 已提交
481 482 483 484 485 486
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
  if (pIter == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
487 488 489 490
  char name[PATH_MAX + 100] = {0};
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
  pIter->name = strdup(name);
  if (pIter->name == NULL) {
S
Shengliang Guan 已提交
491
    taosMemoryFree(pIter);
S
Shengliang Guan 已提交
492
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
493 494 495 496 497 498
    return NULL;
  }

  return pIter;
}

S
Shengliang Guan 已提交
499
static void sdbCloseIter(SSdbIter *pIter) {
S
Shengliang Guan 已提交
500
  if (pIter == NULL) return;
S
Shengliang Guan 已提交
501

S
Shengliang Guan 已提交
502 503
  if (pIter->file != NULL) {
    taosCloseFile(&pIter->file);
S
Shengliang Guan 已提交
504
    pIter->file = NULL;
S
Shengliang Guan 已提交
505
  }
S
Shengliang Guan 已提交
506

S
Shengliang Guan 已提交
507 508 509 510 511
  if (pIter->name != NULL) {
    taosRemoveFile(pIter->name);
    taosMemoryFree(pIter->name);
    pIter->name = NULL;
  }
S
Shengliang Guan 已提交
512

S
Shengliang Guan 已提交
513
  mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
514 515 516
  taosMemoryFree(pIter);
}

S
Shengliang Guan 已提交
517 518 519
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
520

S
Shengliang Guan 已提交
521 522 523 524
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);

  taosThreadMutexLock(&pSdb->filelock);
S
Shengliang Guan 已提交
525 526
  int64_t commitIndex = pSdb->lastCommitVer;
  int64_t commitTerm = pSdb->lastCommitTerm;
S
Shengliang Guan 已提交
527
  int64_t curConfig = pSdb->curConfig;
S
Shengliang Guan 已提交
528 529 530
  if (taosCopyFile(datafile, pIter->name) < 0) {
    taosThreadMutexUnlock(&pSdb->filelock);
    terrno = TAOS_SYSTEM_ERROR(errno);
531
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
S
Shengliang Guan 已提交
532 533
    sdbCloseIter(pIter);
    return -1;
S
Shengliang Guan 已提交
534
  }
S
Shengliang Guan 已提交
535
  taosThreadMutexUnlock(&pSdb->filelock);
S
Shengliang Guan 已提交
536

S
Shengliang Guan 已提交
537 538 539
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
  if (pIter->file == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
540
    mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
541 542 543 544 545
    sdbCloseIter(pIter);
    return -1;
  }

  *ppIter = pIter;
S
Shengliang Guan 已提交
546 547
  mInfo("sdbiter:%p, is created to read snapshot, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pIter,
        commitIndex, commitTerm, curConfig, pIter->name);
S
Shengliang Guan 已提交
548
  return 0;
S
Shengliang Guan 已提交
549 550
}

S
Shengliang Guan 已提交
551 552 553 554
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter) {
  sdbCloseIter(pIter);
  return 0;
}
S
Shengliang Guan 已提交
555

S
Shengliang Guan 已提交
556
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
S
Shengliang Guan 已提交
557
  int32_t maxlen = 100;
S
Shengliang Guan 已提交
558
  void   *pBuf = taosMemoryCalloc(1, maxlen);
S
Shengliang Guan 已提交
559 560
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
561
    return -1;
S
Shengliang Guan 已提交
562 563 564
  }

  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
S
Shengliang Guan 已提交
565
  if (readlen < 0 || readlen > maxlen) {
S
Shengliang Guan 已提交
566
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
567 568 569
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
    *ppBuf = NULL;
    *len = 0;
S
Shengliang Guan 已提交
570
    taosMemoryFree(pBuf);
S
Shengliang Guan 已提交
571 572 573 574 575 576 577
    return -1;
  } else if (readlen == 0) {
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
    *ppBuf = NULL;
    *len = 0;
    taosMemoryFree(pBuf);
    return 0;
S
Shengliang Guan 已提交
578
  } else {  // (readlen <= maxlen)
S
Shengliang Guan 已提交
579 580 581 582 583
    pIter->total += readlen;
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
    *ppBuf = pBuf;
    *len = readlen;
    return 0;
S
Shengliang Guan 已提交
584 585
  }
}
S
Shengliang Guan 已提交
586

S
Shengliang Guan 已提交
587 588 589
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
590

S
Shengliang Guan 已提交
591 592
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pIter->file == NULL) {
S
Shengliang Guan 已提交
593
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
594
    mError("failed to open %s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
595 596 597
    return -1;
  }

S
Shengliang Guan 已提交
598 599 600 601
  *ppIter = pIter;
  mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
  return 0;
}
S
Shengliang Guan 已提交
602

S
Shengliang Guan 已提交
603 604 605 606 607 608 609
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply) {
  int32_t code = 0;

  if (!isApply) {
    sdbCloseIter(pIter);
    mInfo("sdbiter:%p, not apply to sdb", pIter);
    return 0;
S
Shengliang Guan 已提交
610 611
  }

S
Shengliang Guan 已提交
612 613 614
  taosFsyncFile(pIter->file);
  taosCloseFile(&pIter->file);
  pIter->file = NULL;
S
Shengliang Guan 已提交
615

S
Shengliang Guan 已提交
616 617 618
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
  if (taosRenameFile(pIter->name, datafile) != 0) {
S
Shengliang Guan 已提交
619
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
620 621
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
622 623 624
    return -1;
  }

S
Shengliang Guan 已提交
625
  sdbCloseIter(pIter);
S
Shengliang Guan 已提交
626
  if (sdbReadFile(pSdb) != 0) {
S
Shengliang Guan 已提交
627 628 629 630 631 632 633 634 635 636 637 638 639
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
    return -1;
  }

  mInfo("sdbiter:%p, successfully applyed to sdb", pIter);
  return 0;
}

int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
  if (writelen != len) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total);
S
Shengliang Guan 已提交
640 641 642
    return -1;
  }

S
Shengliang Guan 已提交
643 644
  pIter->total += writelen;
  mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
S
Shengliang Guan 已提交
645 646
  return 0;
}