sdbFile.c 9.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21 22 23
#define SDB_TABLE_SIZE 24
#define SDB_RESERVE_SIZE 512

S
Shengliang Guan 已提交
24
static int32_t sdbRunDeployFp(SSdb *pSdb) {
S
Shengliang Guan 已提交
25
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
26

S
Shengliang Guan 已提交
27
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
28
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
29
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
30

S
Shengliang Guan 已提交
31
    if ((*fp)(pSdb->pMnode) != 0) {
32
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
33
      return -1;
S
Shengliang Guan 已提交
34 35 36
    }
  }

S
Shengliang Guan 已提交
37
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
38 39 40
  return 0;
}

41 42
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
  int32_t ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t maxId = -1;
54
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t ver = -1;
70
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
85
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
86 87 88 89 90 91 92 93 94 95 96 97
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

98 99
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
100 101 102 103 104 105 106 107 108
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t maxId = -1;
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
109
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
110 111 112 113 114 115 116 117 118 119
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t ver = -1;
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
120
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
121 122 123 124 125
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
126
  char reserve[SDB_RESERVE_SIZE] = {0};
127
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
128 129 130 131 132 133 134
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
135 136 137 138 139 140
int32_t sdbReadFile(SSdb *pSdb) {
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;

S
Shengliang Guan 已提交
141
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
142
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
143
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
144
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
145
    return -1;
S
Shengliang Guan 已提交
146 147 148
  }

  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
149
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
150
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
151

152 153
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
154
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
155 156
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
157
    return 0;
S
Shengliang Guan 已提交
158 159
  }

160
  if (sdbReadFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
161 162
    mError("failed to read file:%s head since %s", file, terrstr());
    pSdb->curVer = -1;
wafwerar's avatar
wafwerar 已提交
163
    taosMemoryFree(pRaw);
164
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
165 166 167
    return -1;
  }

S
Shengliang Guan 已提交
168
  while (1) {
S
Shengliang Guan 已提交
169
    readLen = sizeof(SSdbRaw);
170
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
171 172 173 174 175 176 177 178
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
179
    if (ret != readLen) {
S
Shengliang Guan 已提交
180 181 182 183 184
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
185
    readLen = pRaw->dataLen + sizeof(int32_t);
186
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
187 188 189 190 191 192
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
193
    if (ret != readLen) {
S
Shengliang Guan 已提交
194 195 196 197 198
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
199
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
200
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
201
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
202 203 204 205
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

206
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
207
    if (code != 0) {
S
Shengliang Guan 已提交
208
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
209 210 211 212 213
      goto PARSE_SDB_DATA_ERROR;
    }
  }

  code = 0;
S
Shengliang Guan 已提交
214
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
215
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
216 217

PARSE_SDB_DATA_ERROR:
218
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
219
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
220

S
Shengliang Guan 已提交
221
  terrno = code;
S
Shengliang Guan 已提交
222 223 224
  return code;
}

S
Shengliang Guan 已提交
225
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
226 227
  int32_t code = 0;

S
Shengliang Guan 已提交
228
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
229
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
230
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
231
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
232

S
Shengliang Guan 已提交
233 234
  mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
         pSdb->lastCommitVer);
S
Shengliang Guan 已提交
235

236
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
237
  if (pFile == NULL) {
S
Shengliang Guan 已提交
238 239 240
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
241 242
  }

243
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
244
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
245
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
246 247 248 249
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
250
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
251 252
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
253
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
254

S
Shengliang Guan 已提交
255 256
    SHashObj *hash = pSdb->hashObjs[i];
    SRWLatch *pLock = &pSdb->locks[i];
S
Shengliang Guan 已提交
257
    taosWLockLatch(pLock);
S
Shengliang Guan 已提交
258

S
Shengliang Guan 已提交
259 260 261 262 263 264 265
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
266

267
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
268

S
Shengliang Guan 已提交
269 270
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
271
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
272
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
273
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
274
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
275
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
276
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
277 278
          break;
        }
S
Shengliang Guan 已提交
279 280

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
281
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
282
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
283
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
284
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
285 286
          break;
        }
S
Shengliang Guan 已提交
287 288
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
289
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
290 291 292
        break;
      }

S
Shengliang Guan 已提交
293
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
294
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
295
    }
S
Shengliang Guan 已提交
296
    taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
297 298 299
  }

  if (code == 0) {
300
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
301 302 303 304
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", tmpfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
305 306
  }

307
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
308

S
Shengliang Guan 已提交
309
  if (code == 0) {
S
Shengliang Guan 已提交
310
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
311 312 313 314
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
315 316 317
  }

  if (code != 0) {
S
Shengliang Guan 已提交
318
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
319
  } else {
S
Shengliang Guan 已提交
320 321
    pSdb->lastCommitVer = pSdb->curVer;
    mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
322 323
  }

S
Shengliang Guan 已提交
324
  terrno = code;
S
Shengliang Guan 已提交
325 326 327
  return code;
}

S
Shengliang Guan 已提交
328 329 330 331 332 333 334 335
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

  return sdbWriteFileImp(pSdb);
}

S
Shengliang Guan 已提交
336
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
337 338
  if (sdbRunDeployFp(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
339 340
  }

S
Shengliang Guan 已提交
341
  if (sdbWriteFileImp(pSdb) != 0) {
S
Shengliang Guan 已提交
342
    return -1;
S
Shengliang Guan 已提交
343 344 345 346
  }

  return 0;
}