sdbFile.c 9.0 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define SDB_TABLE_SIZE 24
#define SDB_RESERVE_SIZE 512

S
Shengliang Guan 已提交
23
static int32_t sdbRunDeployFp(SSdb *pSdb) {
S
Shengliang Guan 已提交
24
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
25

S
Shengliang Guan 已提交
26
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
27
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
28
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
29

S
Shengliang Guan 已提交
30
    if ((*fp)(pSdb->pMnode) != 0) {
S
Shengliang Guan 已提交
31 32
      mError("failed to deploy sdb:%d since %s", i, terrstr());
      return -1;
S
Shengliang Guan 已提交
33 34 35
    }
  }

S
Shengliang Guan 已提交
36
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
37 38 39
  return 0;
}

40 41
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
  int32_t ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
42 43 44 45 46 47 48 49 50 51 52
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t maxId = -1;
53
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t ver = -1;
69
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
84
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
85 86 87 88 89 90 91 92 93 94 95 96
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

97 98
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
99 100 101 102 103 104 105 106 107
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t maxId = -1;
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
108
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
109 110 111 112 113 114 115 116 117 118
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
    int64_t ver = -1;
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
119
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
120 121 122 123 124
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
125
  char reserve[SDB_RESERVE_SIZE] = {0};
126
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
127 128 129 130 131 132 133
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
134 135 136 137 138 139
int32_t sdbReadFile(SSdb *pSdb) {
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;

wafwerar's avatar
wafwerar 已提交
140
  SSdbRaw *pRaw = taosMemoryMalloc(SDB_MAX_SIZE);
S
Shengliang Guan 已提交
141
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
142
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
143
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
144
    return -1;
S
Shengliang Guan 已提交
145 146 147
  }

  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
148
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
149
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
150

151 152
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
153
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
154 155
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
156
    return 0;
S
Shengliang Guan 已提交
157 158
  }

159
  if (sdbReadFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
160 161
    mError("failed to read file:%s head since %s", file, terrstr());
    pSdb->curVer = -1;
wafwerar's avatar
wafwerar 已提交
162
    taosMemoryFree(pRaw);
163
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
164 165 166
    return -1;
  }

S
Shengliang Guan 已提交
167
  while (1) {
S
Shengliang Guan 已提交
168
    readLen = sizeof(SSdbRaw);
169
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
170 171 172 173 174 175 176 177
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
178
    if (ret != readLen) {
S
Shengliang Guan 已提交
179 180 181 182 183
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
184
    readLen = pRaw->dataLen + sizeof(int32_t);
185
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
186 187 188 189 190 191
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
192
    if (ret != readLen) {
S
Shengliang Guan 已提交
193 194 195 196 197
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
198
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
199
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
200
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
201 202 203 204
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
205
    code = sdbWriteNotFree(pSdb, pRaw);
S
Shengliang Guan 已提交
206
    if (code != 0) {
S
Shengliang Guan 已提交
207
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
208 209 210 211 212
      goto PARSE_SDB_DATA_ERROR;
    }
  }

  code = 0;
S
Shengliang Guan 已提交
213
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
214
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
215 216

PARSE_SDB_DATA_ERROR:
217
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
218
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
219

S
Shengliang Guan 已提交
220
  terrno = code;
S
Shengliang Guan 已提交
221 222 223
  return code;
}

S
Shengliang Guan 已提交
224
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
225 226
  int32_t code = 0;

S
Shengliang Guan 已提交
227
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
228
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
229
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
230
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
231

S
Shengliang Guan 已提交
232 233
  mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
         pSdb->lastCommitVer);
S
Shengliang Guan 已提交
234

235
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
236
  if (pFile == NULL) {
S
Shengliang Guan 已提交
237 238 239
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
240 241
  }

242
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
243
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
244
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
245 246 247 248
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
249
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
250 251
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
252
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
253

S
Shengliang Guan 已提交
254 255
    SHashObj *hash = pSdb->hashObjs[i];
    SRWLatch *pLock = &pSdb->locks[i];
S
Shengliang Guan 已提交
256
    taosWLockLatch(pLock);
S
Shengliang Guan 已提交
257

S
Shengliang Guan 已提交
258 259 260 261 262 263 264
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
265

S
Shengliang Guan 已提交
266 267
      sdbPrintOper(pSdb, pRow, "writeFile");

S
Shengliang Guan 已提交
268 269
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
270
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
271
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
272
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
273
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
274
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
275
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
276 277
          break;
        }
S
Shengliang Guan 已提交
278 279

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
280
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
281
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
282
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
283
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
284 285
          break;
        }
S
Shengliang Guan 已提交
286 287
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
288
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
289 290 291
        break;
      }

S
Shengliang Guan 已提交
292
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
293
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
294
    }
S
Shengliang Guan 已提交
295
    taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
296 297 298
  }

  if (code == 0) {
299
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
300 301 302 303
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", tmpfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
304 305
  }

306
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
307

S
Shengliang Guan 已提交
308
  if (code == 0) {
S
Shengliang Guan 已提交
309
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
310 311 312 313
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
314 315 316
  }

  if (code != 0) {
S
Shengliang Guan 已提交
317
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
318
  } else {
S
Shengliang Guan 已提交
319 320
    pSdb->lastCommitVer = pSdb->curVer;
    mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
321 322
  }

S
Shengliang Guan 已提交
323
  terrno = code;
S
Shengliang Guan 已提交
324 325 326
  return code;
}

S
Shengliang Guan 已提交
327 328 329 330 331 332 333 334
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

  return sdbWriteFileImp(pSdb);
}

S
Shengliang Guan 已提交
335
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
336 337
  if (sdbRunDeployFp(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
338 339
  }

S
Shengliang Guan 已提交
340
  if (sdbWriteFileImp(pSdb) != 0) {
S
Shengliang Guan 已提交
341
    return -1;
S
Shengliang Guan 已提交
342 343 344 345
  }

  return 0;
}