sdbFile.c 10.2 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22
#define SDB_RESERVE_SIZE 512
S
Shengliang Guan 已提交
23
#define SDB_FILE_VER     1
S
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
static int32_t sdbRunDeployFp(SSdb *pSdb) {
S
Shengliang Guan 已提交
26
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
27

S
Shengliang Guan 已提交
28
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
29
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
30
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
    if ((*fp)(pSdb->pMnode) != 0) {
33
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
34
      return -1;
S
Shengliang Guan 已提交
35 36 37
    }
  }

S
Shengliang Guan 已提交
38
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
39 40 41
  return 0;
}

42
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
  int64_t sver = 0;
  int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }
  if (sver != SDB_FILE_VER) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
59 60 61 62 63 64 65 66 67
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

68 69 70 71 72 73 74 75 76 77
  ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

S
Shengliang Guan 已提交
78
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
79
    int64_t maxId = 0;
80
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
95
    int64_t ver = 0;
96
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109 110
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
111
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
112 113 114 115 116 117 118 119 120 121 122 123
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

124
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
S
Shengliang Guan 已提交
125 126 127 128 129 130
  int64_t sver = SDB_FILE_VER;
  if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

131
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
132 133 134 135
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

136 137 138 139 140
  if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
141
  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
142
    int64_t maxId = 0;
S
Shengliang Guan 已提交
143 144 145
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
146
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
147 148 149 150 151 152
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
153
    int64_t ver = 0;
S
Shengliang Guan 已提交
154 155 156
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
157
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
158 159 160 161 162
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
163
  char reserve[SDB_RESERVE_SIZE] = {0};
164
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
165 166 167 168 169 170 171
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
172 173 174 175 176 177
int32_t sdbReadFile(SSdb *pSdb) {
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;

S
Shengliang Guan 已提交
178
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
179
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
180
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
181
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
182
    return -1;
S
Shengliang Guan 已提交
183 184 185
  }

  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
186
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
187
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
188

189 190
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
191
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
192 193
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
194
    return 0;
S
Shengliang Guan 已提交
195 196
  }

197
  if (sdbReadFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
198 199
    mError("failed to read file:%s head since %s", file, terrstr());
    pSdb->curVer = -1;
200
    pSdb->curTerm = -1;
wafwerar's avatar
wafwerar 已提交
201
    taosMemoryFree(pRaw);
202
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
203 204 205
    return -1;
  }

S
Shengliang Guan 已提交
206 207 208
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
209
  while (1) {
S
Shengliang Guan 已提交
210
    readLen = sizeof(SSdbRaw);
211
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
212 213 214 215 216 217 218 219
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
220
    if (ret != readLen) {
S
Shengliang Guan 已提交
221 222 223 224 225
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
226
    readLen = pRaw->dataLen + sizeof(int32_t);
227
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
228 229 230 231 232 233
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
234
    if (ret != readLen) {
S
Shengliang Guan 已提交
235 236 237 238 239
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
240
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
241
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
242
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
243 244 245 246
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

247
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
248
    if (code != 0) {
S
Shengliang Guan 已提交
249
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
250
      goto _OVER;
S
Shengliang Guan 已提交
251 252 253 254
    }
  }

  code = 0;
S
Shengliang Guan 已提交
255
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
256
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
S
Shengliang Guan 已提交
257
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
258

S
Shengliang Guan 已提交
259
_OVER:
260
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
261
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
262

S
Shengliang Guan 已提交
263
  terrno = code;
S
Shengliang Guan 已提交
264 265 266
  return code;
}

S
Shengliang Guan 已提交
267
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
268 269
  int32_t code = 0;

S
Shengliang Guan 已提交
270
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
271
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
272
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
273
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
274

275 276
  mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
         pSdb->curTerm, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
277

278
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
279
  if (pFile == NULL) {
S
Shengliang Guan 已提交
280 281 282
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
283 284
  }

285
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
286
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
287
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
288 289 290 291
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
292
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
293 294
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
295
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
296

S
Shengliang Guan 已提交
297
    SHashObj       *hash = pSdb->hashObjs[i];
298 299
    TdThreadRwlock *pLock = &pSdb->locks[i];
    taosThreadRwlockWrlock(pLock);
S
Shengliang Guan 已提交
300

S
Shengliang Guan 已提交
301 302 303
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
304 305 306 307 308 309 310
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
311 312 313
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
314

315
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
316

S
Shengliang Guan 已提交
317 318
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
319
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
320
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
321
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
322
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
323
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
324
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
325 326
          break;
        }
S
Shengliang Guan 已提交
327 328

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
329
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
330
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
331
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
332
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
333 334
          break;
        }
S
Shengliang Guan 已提交
335 336
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
337
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
338 339 340
        break;
      }

S
Shengliang Guan 已提交
341
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
342
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
343
    }
344
    taosThreadRwlockUnlock(pLock);
S
Shengliang Guan 已提交
345 346 347
  }

  if (code == 0) {
348
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
349 350
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
wafwerar's avatar
wafwerar 已提交
351
      mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
352
    }
S
Shengliang Guan 已提交
353 354
  }

355
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
356

S
Shengliang Guan 已提交
357
  if (code == 0) {
S
Shengliang Guan 已提交
358
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
359 360 361 362
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
363 364 365
  }

  if (code != 0) {
S
Shengliang Guan 已提交
366
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
367
  } else {
S
Shengliang Guan 已提交
368
    pSdb->lastCommitVer = pSdb->curVer;
369
    mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
S
Shengliang Guan 已提交
370 371
  }

S
Shengliang Guan 已提交
372
  terrno = code;
S
Shengliang Guan 已提交
373 374 375
  return code;
}

S
Shengliang Guan 已提交
376 377 378 379 380 381 382 383
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

  return sdbWriteFileImp(pSdb);
}

S
Shengliang Guan 已提交
384
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
385 386
  if (sdbRunDeployFp(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
387 388
  }

S
Shengliang Guan 已提交
389
  if (sdbWriteFileImp(pSdb) != 0) {
S
Shengliang Guan 已提交
390
    return -1;
S
Shengliang Guan 已提交
391 392 393 394
  }

  return 0;
}