sdbFile.c 19.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdb.h"
18
#include "sync.h"
S
Shengliang Guan 已提交
19
#include "tchecksum.h"
S
Shengliang Guan 已提交
20
#include "wal.h"
S
Shengliang Guan 已提交
21

S
Shengliang Guan 已提交
22
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
23
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
24
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
static int32_t sdbDeployData(SSdb *pSdb) {
27
  mInfo("start to deploy sdb");
S
Shengliang Guan 已提交
28

S
Shengliang Guan 已提交
29
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
30
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
31
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
32

33
    mInfo("start to deploy sdb:%s", sdbTableName(i));
S
Shengliang Guan 已提交
34
    if ((*fp)(pSdb->pMnode) != 0) {
35
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
36
      return -1;
S
Shengliang Guan 已提交
37 38 39
    }
  }

40
  mInfo("sdb deploy success");
S
Shengliang Guan 已提交
41 42 43
  return 0;
}

S
Shengliang Guan 已提交
44
static void sdbResetData(SSdb *pSdb) {
45
  mInfo("start to reset sdb");
S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL) continue;

      sdbFreeRow(pSdb, pRow, true);
      ppRow = taosHashIterate(hash, ppRow);
    }
  }

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    taosHashClear(pSdb->hashObjs[i]);
    pSdb->tableVer[i] = 0;
    pSdb->maxId[i] = 0;
68
    mInfo("sdb:%s is reset", sdbTableName(i));
S
Shengliang Guan 已提交
69 70
  }

S
Shengliang Guan 已提交
71 72 73 74 75 76
  pSdb->applyIndex = -1;
  pSdb->applyTerm = -1;
  pSdb->applyConfig = -1;
  pSdb->commitIndex = -1;
  pSdb->commitTerm = -1;
  pSdb->commitConfig = -1;
77
  mInfo("sdb reset success");
S
Shengliang Guan 已提交
78 79
}

80
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
96
  ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
S
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
106
  ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
107 108 109 110 111 112 113 114 115
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
116
  ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
S
Shengliang Guan 已提交
117 118 119 120 121 122 123 124 125
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
126
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
127
    int64_t maxId = 0;
128
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
143
    int64_t ver = 0;
144
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
159
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
160 161 162 163 164 165 166 167 168 169 170 171
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

172
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
173 174 175 176 177 178
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
179
  if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
180 181 182 183
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
184
  if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
185 186 187 188
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
189
  if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
190 191 192 193
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
194
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
195
    int64_t maxId = 0;
S
Shengliang Guan 已提交
196 197 198
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
199
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
200 201 202 203 204 205
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
206
    int64_t ver = 0;
S
Shengliang Guan 已提交
207 208 209
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
210
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
211 212 213 214 215
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
216
  char reserve[SDB_RESERVE_SIZE] = {0};
217
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
218 219 220 221 222 223 224
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
225
static int32_t sdbReadFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
226 227 228 229
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;
S
Shengliang Guan 已提交
230 231 232
  char    file[PATH_MAX] = {0};

  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
233
  mInfo("start to read sdb file:%s", file);
S
Shengliang Guan 已提交
234

235
  SSdbRaw *pRaw = taosMemoryMalloc(TSDB_MAX_MSG_SIZE + 100);
S
Shengliang Guan 已提交
236
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
237
    terrno = TSDB_CODE_OUT_OF_MEMORY;
238
    mError("failed read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
239
    return -1;
S
Shengliang Guan 已提交
240 241
  }

242 243
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
244
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
245
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
246
    mInfo("read sdb file:%s finished since %s", file, terrstr());
S
Shengliang Guan 已提交
247
    return 0;
S
Shengliang Guan 已提交
248 249
  }

250
  if (sdbReadFileHead(pSdb, pFile) != 0) {
251
    mError("failed to read sdb file:%s head since %s", file, terrstr());
wafwerar's avatar
wafwerar 已提交
252
    taosMemoryFree(pRaw);
253
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
254 255 256
    return -1;
  }

S
Shengliang Guan 已提交
257 258 259
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
260
  while (1) {
S
Shengliang Guan 已提交
261
    readLen = sizeof(SSdbRaw);
262
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
263 264 265 266
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
267
      mError("failed to read sdb file:%s since %s", file, tstrerror(code));
268
      goto _OVER;
S
Shengliang Guan 已提交
269 270
    }

S
Shengliang Guan 已提交
271
    if (ret != readLen) {
S
Shengliang Guan 已提交
272
      code = TSDB_CODE_FILE_CORRUPTED;
273 274
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " != readLen:%d", file, tstrerror(code), ret, readLen);
      goto _OVER;
S
Shengliang Guan 已提交
275 276
    }

S
Shengliang Guan 已提交
277
    readLen = pRaw->dataLen + sizeof(int32_t);
278 279 280 281 282
    if (readLen >= pRaw->dataLen) {
      SSdbRaw *pNewRaw = taosMemoryMalloc(pRaw->dataLen + TSDB_MAX_MSG_SIZE);
      if (pNewRaw == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        mError("failed read sdb file since malloc new sdbRaw size:%d failed", pRaw->dataLen + TSDB_MAX_MSG_SIZE);
283
        goto _OVER;
284
      }
285
      mInfo("malloc new sdbRaw size:%d, type:%d", pRaw->dataLen + TSDB_MAX_MSG_SIZE, pRaw->type);
286 287 288 289 290
      memcpy(pNewRaw, pRaw, sizeof(SSdbRaw));
      sdbFreeRaw(pRaw);
      pRaw = pNewRaw;
    }

291
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
292 293
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
294 295
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " readLen:%d", file, tstrerror(code), ret, readLen);
      goto _OVER;
S
Shengliang Guan 已提交
296 297
    }

S
Shengliang Guan 已提交
298
    if (ret != readLen) {
S
Shengliang Guan 已提交
299
      code = TSDB_CODE_FILE_CORRUPTED;
300 301
      mError("failed to read sdb file:%s since %s, ret:%" PRId64 " != readLen:%d", file, tstrerror(code), ret, readLen);
      goto _OVER;
S
Shengliang Guan 已提交
302 303
    }

S
Shengliang Guan 已提交
304
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
305
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
306
      code = TSDB_CODE_CHECKSUM_ERROR;
307 308
      mError("failed to read sdb file:%s since %s, readLen:%d", file, tstrerror(code), readLen);
      goto _OVER;
S
Shengliang Guan 已提交
309 310
    }

311
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
312
    if (code != 0) {
313
      mError("failed to read sdb file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
314
      goto _OVER;
S
Shengliang Guan 已提交
315 316 317 318
    }
  }

  code = 0;
S
Shengliang Guan 已提交
319 320 321
  pSdb->commitIndex = pSdb->applyIndex;
  pSdb->commitTerm = pSdb->applyTerm;
  pSdb->commitConfig = pSdb->applyConfig;
S
Shengliang Guan 已提交
322
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
H
Hongze Cheng 已提交
323 324
  mInfo("read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->commitIndex,
        pSdb->commitTerm, pSdb->commitConfig);
S
Shengliang Guan 已提交
325

S
Shengliang Guan 已提交
326
_OVER:
327
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
328
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
329

S
Shengliang Guan 已提交
330
  terrno = code;
S
Shengliang Guan 已提交
331 332 333
  return code;
}

S
Shengliang Guan 已提交
334 335 336 337 338 339
int32_t sdbReadFile(SSdb *pSdb) {
  taosThreadMutexLock(&pSdb->filelock);

  sdbResetData(pSdb);
  int32_t code = sdbReadFileImp(pSdb);
  if (code != 0) {
340
    mError("failed to read sdb file since %s", terrstr());
S
Shengliang Guan 已提交
341 342 343 344 345 346 347
    sdbResetData(pSdb);
  }

  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
}

S
Shengliang Guan 已提交
348
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
349 350
  int32_t code = 0;

S
Shengliang Guan 已提交
351
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
352
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
353
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
354
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
355

356
  mInfo("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
H
Hongze Cheng 已提交
357 358 359
        " term:%" PRId64 " config:%" PRId64 ", file:%s",
        pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
        curfile);
S
Shengliang Guan 已提交
360

361
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
362
  if (pFile == NULL) {
S
Shengliang Guan 已提交
363
    terrno = TAOS_SYSTEM_ERROR(errno);
364
    mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
S
Shengliang Guan 已提交
365
    return -1;
S
Shengliang Guan 已提交
366 367
  }

368
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
369
    mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
370
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
371 372 373 374
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
375
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
376 377
    if (encodeFp == NULL) continue;

378
    mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
379

380 381
    SHashObj *hash = pSdb->hashObjs[i];
    sdbWriteLock(pSdb, i);
S
Shengliang Guan 已提交
382

S
Shengliang Guan 已提交
383 384 385
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
386 387 388 389 390 391 392
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
393 394 395
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
396

397
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
398

S
Shengliang Guan 已提交
399 400
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
401
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
402
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
403
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
404
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
405
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
406
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
407 408
          break;
        }
S
Shengliang Guan 已提交
409 410

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
411
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
412
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
413
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
414
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
415 416
          break;
        }
S
Shengliang Guan 已提交
417
      } else {
S
Shengliang Guan 已提交
418
        code = TSDB_CODE_APP_ERROR;
S
Shengliang Guan 已提交
419
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
420 421 422
        break;
      }

S
Shengliang Guan 已提交
423
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
424
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
425
    }
426
    sdbUnLock(pSdb, i);
S
Shengliang Guan 已提交
427 428 429
  }

  if (code == 0) {
430
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
431 432
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
433
      mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
434
    }
S
Shengliang Guan 已提交
435 436
  }

437
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
438

S
Shengliang Guan 已提交
439
  if (code == 0) {
S
Shengliang Guan 已提交
440
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
441 442
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
443
      mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
444
    }
S
Shengliang Guan 已提交
445 446 447
  }

  if (code != 0) {
448
    mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
449
  } else {
S
Shengliang Guan 已提交
450 451 452
    pSdb->commitIndex = pSdb->applyIndex;
    pSdb->commitTerm = pSdb->applyTerm;
    pSdb->commitConfig = pSdb->applyConfig;
453
    mInfo("write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
H
Hongze Cheng 已提交
454
          pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
S
Shengliang Guan 已提交
455 456
  }

S
Shengliang Guan 已提交
457
  terrno = code;
S
Shengliang Guan 已提交
458 459 460
  return code;
}

461
int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
462
  int32_t code = 0;
S
Shengliang Guan 已提交
463
  if (pSdb->applyIndex == pSdb->commitIndex) {
S
Shengliang Guan 已提交
464 465 466
    return 0;
  }

467 468 469 470
  if (pSdb->applyIndex - pSdb->commitIndex < delta) {
    return 0;
  }

S
Shengliang Guan 已提交
471
  taosThreadMutexLock(&pSdb->filelock);
472
  if (pSdb->pWal != NULL) {
473 474 475 476 477 478
    // code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
    if (pSdb->sync == 0) {
      code = 0;
    } else {
      code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex);
    }
479 480 481 482 483 484
  }
  if (code == 0) {
    code = sdbWriteFileImp(pSdb);
  }
  if (code == 0) {
    if (pSdb->pWal != NULL) {
485 486 487 488 489 490 491
      // code = walEndSnapshot(pSdb->pWal);

      if (pSdb->sync == 0) {
        code = 0;
      } else {
        code = syncEndSnapshot(pSdb->sync);
      }
492 493
    }
  }
S
Shengliang Guan 已提交
494
  if (code != 0) {
495
    mError("failed to write sdb file since %s", terrstr());
S
Shengliang Guan 已提交
496 497 498
  }
  taosThreadMutexUnlock(&pSdb->filelock);
  return code;
S
Shengliang Guan 已提交
499 500
}

S
Shengliang Guan 已提交
501
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
502
  if (sdbDeployData(pSdb) != 0) {
S
Shengliang Guan 已提交
503
    return -1;
S
Shengliang Guan 已提交
504 505
  }

506
  if (sdbWriteFile(pSdb, 0) != 0) {
S
Shengliang Guan 已提交
507
    return -1;
S
Shengliang Guan 已提交
508 509 510 511
  }

  return 0;
}
S
Shengliang Guan 已提交
512

S
Shengliang Guan 已提交
513
static SSdbIter *sdbCreateIter(SSdb *pSdb) {
S
Shengliang Guan 已提交
514 515 516 517 518 519
  SSdbIter *pIter = taosMemoryCalloc(1, sizeof(SSdbIter));
  if (pIter == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
520 521 522 523
  char name[PATH_MAX + 100] = {0};
  snprintf(name, sizeof(name), "%s%ssdb.data.%" PRIu64, pSdb->tmpDir, TD_DIRSEP, (uint64_t)pIter);
  pIter->name = strdup(name);
  if (pIter->name == NULL) {
S
Shengliang Guan 已提交
524
    taosMemoryFree(pIter);
S
Shengliang Guan 已提交
525
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
526 527 528 529 530 531
    return NULL;
  }

  return pIter;
}

S
Shengliang Guan 已提交
532
static void sdbCloseIter(SSdbIter *pIter) {
S
Shengliang Guan 已提交
533
  if (pIter == NULL) return;
S
Shengliang Guan 已提交
534

S
Shengliang Guan 已提交
535 536
  if (pIter->file != NULL) {
    taosCloseFile(&pIter->file);
S
Shengliang Guan 已提交
537
    pIter->file = NULL;
S
Shengliang Guan 已提交
538
  }
S
Shengliang Guan 已提交
539

S
Shengliang Guan 已提交
540
  if (pIter->name != NULL) {
S
Shengliang Guan 已提交
541
    (void)taosRemoveFile(pIter->name);
S
Shengliang Guan 已提交
542 543 544
    taosMemoryFree(pIter->name);
    pIter->name = NULL;
  }
S
Shengliang Guan 已提交
545

546
  mInfo("sdbiter:%p, is closed, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
547 548 549
  taosMemoryFree(pIter);
}

550
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) {
S
Shengliang Guan 已提交
551 552
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
553

S
Shengliang Guan 已提交
554 555 556 557
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);

  taosThreadMutexLock(&pSdb->filelock);
S
Shengliang Guan 已提交
558 559 560
  int64_t commitIndex = pSdb->commitIndex;
  int64_t commitTerm = pSdb->commitTerm;
  int64_t commitConfig = pSdb->commitConfig;
S
Shengliang Guan 已提交
561 562 563
  if (taosCopyFile(datafile, pIter->name) < 0) {
    taosThreadMutexUnlock(&pSdb->filelock);
    terrno = TAOS_SYSTEM_ERROR(errno);
564
    mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
S
Shengliang Guan 已提交
565 566
    sdbCloseIter(pIter);
    return -1;
S
Shengliang Guan 已提交
567
  }
S
Shengliang Guan 已提交
568
  taosThreadMutexUnlock(&pSdb->filelock);
S
Shengliang Guan 已提交
569

S
Shengliang Guan 已提交
570 571 572
  pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
  if (pIter->file == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
573
    mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
574 575 576 577 578
    sdbCloseIter(pIter);
    return -1;
  }

  *ppIter = pIter;
579 580 581 582
  if (index != NULL) *index = commitIndex;
  if (term != NULL) *term = commitTerm;
  if (config != NULL) *config = commitConfig;

H
Hongze Cheng 已提交
583 584
  mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
        pIter, commitIndex, commitTerm, commitConfig, pIter->name);
S
Shengliang Guan 已提交
585
  return 0;
S
Shengliang Guan 已提交
586 587
}

588
void sdbStopRead(SSdb *pSdb, SSdbIter *pIter) { sdbCloseIter(pIter); }
S
Shengliang Guan 已提交
589

S
Shengliang Guan 已提交
590
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
591
  int32_t maxlen = 4096;
S
Shengliang Guan 已提交
592
  void   *pBuf = taosMemoryCalloc(1, maxlen);
S
Shengliang Guan 已提交
593 594
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
595
    return -1;
S
Shengliang Guan 已提交
596 597 598
  }

  int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
S
Shengliang Guan 已提交
599
  if (readlen < 0 || readlen > maxlen) {
S
Shengliang Guan 已提交
600
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
601 602 603
    mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
    *ppBuf = NULL;
    *len = 0;
S
Shengliang Guan 已提交
604
    taosMemoryFree(pBuf);
S
Shengliang Guan 已提交
605 606
    return -1;
  } else if (readlen == 0) {
607
    mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
S
Shengliang Guan 已提交
608 609 610 611
    *ppBuf = NULL;
    *len = 0;
    taosMemoryFree(pBuf);
    return 0;
S
Shengliang Guan 已提交
612
  } else {  // (readlen <= maxlen)
S
Shengliang Guan 已提交
613
    pIter->total += readlen;
614
    mInfo("sdbiter:%p, read:%d bytes from snapshot, total:%" PRId64, pIter, readlen, pIter->total);
S
Shengliang Guan 已提交
615 616 617
    *ppBuf = pBuf;
    *len = readlen;
    return 0;
S
Shengliang Guan 已提交
618 619
  }
}
S
Shengliang Guan 已提交
620

S
Shengliang Guan 已提交
621 622 623
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
  SSdbIter *pIter = sdbCreateIter(pSdb);
  if (pIter == NULL) return -1;
S
Shengliang Guan 已提交
624

S
Shengliang Guan 已提交
625 626
  pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pIter->file == NULL) {
S
Shengliang Guan 已提交
627
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
628
    mError("failed to open %s since %s", pIter->name, terrstr());
S
Shengliang Guan 已提交
629
    sdbCloseIter(pIter);
S
Shengliang Guan 已提交
630 631 632
    return -1;
  }

S
Shengliang Guan 已提交
633
  *ppIter = pIter;
634
  mInfo("sdbiter:%p, is created to write snapshot, file:%s", pIter, pIter->name);
S
Shengliang Guan 已提交
635 636
  return 0;
}
S
Shengliang Guan 已提交
637

638
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, int64_t term, int64_t config) {
639
  int32_t code = -1;
S
Shengliang Guan 已提交
640 641

  if (!isApply) {
642
    mInfo("sdbiter:%p, not apply to sdb", pIter);
643 644
    code = 0;
    goto _OVER;
S
Shengliang Guan 已提交
645 646
  }

647 648 649 650
  if (taosFsyncFile(pIter->file) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("sdbiter:%p, failed to fasync file %s since %s", pIter, pIter->name, terrstr());
    goto _OVER;
S
Shengliang Guan 已提交
651 652
  }

S
Shengliang Guan 已提交
653 654
  taosCloseFile(&pIter->file);
  pIter->file = NULL;
S
Shengliang Guan 已提交
655

S
Shengliang Guan 已提交
656 657 658
  char datafile[PATH_MAX] = {0};
  snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
  if (taosRenameFile(pIter->name, datafile) != 0) {
S
Shengliang Guan 已提交
659
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
660
    mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
661
    goto _OVER;
S
Shengliang Guan 已提交
662 663 664
  }

  if (sdbReadFile(pSdb) != 0) {
S
Shengliang Guan 已提交
665
    mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
666
    goto _OVER;
S
Shengliang Guan 已提交
667 668
  }

669 670 671 672 673 674 675 676 677 678
  if (config > 0) {
    pSdb->commitConfig = config;
  }
  if (term > 0) {
    pSdb->commitTerm = term;
  }
  if (index > 0) {
    pSdb->commitIndex = index;
  }

679
  mInfo("sdbiter:%p, success applyed to sdb", pIter);
680 681 682
  code = 0;

_OVER:
S
Shengliang Guan 已提交
683
  sdbCloseIter(pIter);
684
  return code;
S
Shengliang Guan 已提交
685 686 687 688 689 690 691
}

int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
  int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
  if (writelen != len) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total);
S
Shengliang Guan 已提交
692 693 694
    return -1;
  }

S
Shengliang Guan 已提交
695
  pIter->total += writelen;
696
  mInfo("sdbiter:%p, write:%d bytes to snapshot, total:%" PRId64, pIter, writelen, pIter->total);
S
Shengliang Guan 已提交
697
  return 0;
698
}