sdbFile.c 9.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tchecksum.h"
S
Shengliang Guan 已提交
19
#include "wal.h"
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21
#define SDB_TABLE_SIZE   24
S
Shengliang Guan 已提交
22 23
#define SDB_RESERVE_SIZE 512

S
Shengliang Guan 已提交
24
static int32_t sdbRunDeployFp(SSdb *pSdb) {
S
Shengliang Guan 已提交
25
  mDebug("start to deploy sdb");
S
Shengliang Guan 已提交
26

S
Shengliang Guan 已提交
27
  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
28
    SdbDeployFp fp = pSdb->deployFps[i];
S
Shengliang Guan 已提交
29
    if (fp == NULL) continue;
S
Shengliang Guan 已提交
30

S
Shengliang Guan 已提交
31
    if ((*fp)(pSdb->pMnode) != 0) {
32
      mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
S
Shengliang Guan 已提交
33
      return -1;
S
Shengliang Guan 已提交
34 35 36
    }
  }

S
Shengliang Guan 已提交
37
  mDebug("sdb deploy successfully");
S
Shengliang Guan 已提交
38 39 40
  return 0;
}

41 42
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
  int32_t ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(int64_t)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
53
    int64_t maxId = 0;
54
    ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
S
Shengliang Guan 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->maxId[i] = maxId;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
69
    int64_t ver = 0;
70
    ret = taosReadFile(pFile, &ver, sizeof(int64_t));
S
Shengliang Guan 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84
    if (ret < 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
    if (ret != sizeof(int64_t)) {
      terrno = TSDB_CODE_FILE_CORRUPTED;
      return -1;
    }
    if (i < SDB_MAX) {
      pSdb->tableVer[i] = ver;
    }
  }

  char reserve[SDB_RESERVE_SIZE] = {0};
85
  ret = taosReadFile(pFile, reserve, sizeof(reserve));
S
Shengliang Guan 已提交
86 87 88 89 90 91 92 93 94 95 96 97
  if (ret < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }
  if (ret != sizeof(reserve)) {
    terrno = TSDB_CODE_FILE_CORRUPTED;
    return -1;
  }

  return 0;
}

98 99
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
  if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
100 101 102 103 104
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
105
    int64_t maxId = 0;
S
Shengliang Guan 已提交
106 107 108
    if (i < SDB_MAX) {
      maxId = pSdb->maxId[i];
    }
109
    if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
110 111 112 113 114 115
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

  for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
S
Shengliang Guan 已提交
116
    int64_t ver = 0;
S
Shengliang Guan 已提交
117 118 119
    if (i < SDB_MAX) {
      ver = pSdb->tableVer[i];
    }
120
    if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
S
Shengliang Guan 已提交
121 122 123 124 125
      terrno = TAOS_SYSTEM_ERROR(errno);
      return -1;
    }
  }

S
Shengliang Guan 已提交
126
  char reserve[SDB_RESERVE_SIZE] = {0};
127
  if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
S
Shengliang Guan 已提交
128 129 130 131 132 133 134
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
135 136 137 138 139 140
int32_t sdbReadFile(SSdb *pSdb) {
  int64_t offset = 0;
  int32_t code = 0;
  int32_t readLen = 0;
  int64_t ret = 0;

S
Shengliang Guan 已提交
141
  SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
S
Shengliang Guan 已提交
142
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
143
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
144
    mError("failed read file since %s", terrstr());
S
Shengliang Guan 已提交
145
    return -1;
S
Shengliang Guan 已提交
146 147 148
  }

  char file[PATH_MAX] = {0};
S
Shengliang Guan 已提交
149
  snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
150
  mDebug("start to read file:%s", file);
S
Shengliang Guan 已提交
151

152 153
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
  if (pFile == NULL) {
wafwerar's avatar
wafwerar 已提交
154
    taosMemoryFree(pRaw);
S
Shengliang Guan 已提交
155 156
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
157
    return 0;
S
Shengliang Guan 已提交
158 159
  }

160
  if (sdbReadFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
161 162
    mError("failed to read file:%s head since %s", file, terrstr());
    pSdb->curVer = -1;
wafwerar's avatar
wafwerar 已提交
163
    taosMemoryFree(pRaw);
164
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
165 166 167
    return -1;
  }

S
Shengliang Guan 已提交
168 169 170
  int64_t tableVer[SDB_MAX] = {0};
  memcpy(tableVer, pSdb->tableVer, sizeof(tableVer));

S
Shengliang Guan 已提交
171
  while (1) {
S
Shengliang Guan 已提交
172
    readLen = sizeof(SSdbRaw);
173
    ret = taosReadFile(pFile, pRaw, readLen);
S
Shengliang Guan 已提交
174 175 176 177 178 179 180 181
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
182
    if (ret != readLen) {
S
Shengliang Guan 已提交
183 184 185 186 187
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
188
    readLen = pRaw->dataLen + sizeof(int32_t);
189
    ret = taosReadFile(pFile, pRaw->pData, readLen);
S
Shengliang Guan 已提交
190 191 192 193 194 195
    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
196
    if (ret != readLen) {
S
Shengliang Guan 已提交
197 198 199 200 201
      code = TSDB_CODE_FILE_CORRUPTED;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

S
Shengliang Guan 已提交
202
    int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
203
    if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
S
Shengliang Guan 已提交
204
      code = TSDB_CODE_CHECKSUM_ERROR;
S
Shengliang Guan 已提交
205 206 207 208
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
    }

209
    code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
210
    if (code != 0) {
S
Shengliang Guan 已提交
211
      mError("failed to read file:%s since %s", file, terrstr());
S
Shengliang Guan 已提交
212
      goto _OVER;
S
Shengliang Guan 已提交
213 214 215 216
    }
  }

  code = 0;
S
Shengliang Guan 已提交
217
  pSdb->lastCommitVer = pSdb->curVer;
S
Shengliang Guan 已提交
218
  memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
S
Shengliang Guan 已提交
219
  mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
220

S
Shengliang Guan 已提交
221
_OVER:
222
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
223
  sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
224

S
Shengliang Guan 已提交
225
  terrno = code;
S
Shengliang Guan 已提交
226 227 228
  return code;
}

S
Shengliang Guan 已提交
229
static int32_t sdbWriteFileImp(SSdb *pSdb) {
S
Shengliang Guan 已提交
230 231
  int32_t code = 0;

S
Shengliang Guan 已提交
232
  char tmpfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
233
  snprintf(tmpfile, sizeof(tmpfile), "%s%ssdb.data", pSdb->tmpDir, TD_DIRSEP);
S
Shengliang Guan 已提交
234
  char curfile[PATH_MAX] = {0};
S
Shengliang Guan 已提交
235
  snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
S
Shengliang Guan 已提交
236

S
Shengliang Guan 已提交
237 238
  mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
         pSdb->lastCommitVer);
S
Shengliang Guan 已提交
239

240
  TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
241
  if (pFile == NULL) {
S
Shengliang Guan 已提交
242 243 244
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, terrstr());
    return -1;
S
Shengliang Guan 已提交
245 246
  }

247
  if (sdbWriteFileHead(pSdb, pFile) != 0) {
S
Shengliang Guan 已提交
248
    mError("failed to write file:%s head since %s", tmpfile, terrstr());
249
    taosCloseFile(&pFile);
S
Shengliang Guan 已提交
250 251 252 253
    return -1;
  }

  for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
S
Shengliang Guan 已提交
254
    SdbEncodeFp encodeFp = pSdb->encodeFps[i];
S
Shengliang Guan 已提交
255 256
    if (encodeFp == NULL) continue;

S
Shengliang Guan 已提交
257
    mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
S
Shengliang Guan 已提交
258

S
Shengliang Guan 已提交
259 260
    SHashObj *hash = pSdb->hashObjs[i];
    SRWLatch *pLock = &pSdb->locks[i];
S
Shengliang Guan 已提交
261
    taosWLockLatch(pLock);
S
Shengliang Guan 已提交
262

S
Shengliang Guan 已提交
263 264 265
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
266 267 268 269 270 271 272
      if (pRow == NULL) {
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }

      if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
        sdbPrintOper(pSdb, pRow, "not-write");
S
Shengliang Guan 已提交
273 274 275
        ppRow = taosHashIterate(hash, ppRow);
        continue;
      }
S
Shengliang Guan 已提交
276

277
      sdbPrintOper(pSdb, pRow, "write");
S
Shengliang Guan 已提交
278

S
Shengliang Guan 已提交
279 280
      SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
281
        pRaw->status = pRow->status;
S
Shengliang Guan 已提交
282
        int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
283
        if (taosWriteFile(pFile, pRaw, writeLen) != writeLen) {
S
Shengliang Guan 已提交
284
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
285
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
286
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
287 288
          break;
        }
S
Shengliang Guan 已提交
289 290

        int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
291
        if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
S
Shengliang Guan 已提交
292
          code = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
293
          taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
294
          sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
295 296
          break;
        }
S
Shengliang Guan 已提交
297 298
      } else {
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
299
        taosHashCancelIterate(hash, ppRow);
S
Shengliang Guan 已提交
300 301 302
        break;
      }

S
Shengliang Guan 已提交
303
      sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
304
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
305
    }
S
Shengliang Guan 已提交
306
    taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
307 308 309
  }

  if (code == 0) {
310
    code = taosFsyncFile(pFile);
S
Shengliang Guan 已提交
311 312
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
wafwerar's avatar
wafwerar 已提交
313
      mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
S
Shengliang Guan 已提交
314
    }
S
Shengliang Guan 已提交
315 316
  }

317
  taosCloseFile(&pFile);
S
Shengliang Guan 已提交
318

S
Shengliang Guan 已提交
319
  if (code == 0) {
S
Shengliang Guan 已提交
320
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
321 322 323 324
    if (code != 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to write file:%s since %s", curfile, tstrerror(code));
    }
S
Shengliang Guan 已提交
325 326 327
  }

  if (code != 0) {
S
Shengliang Guan 已提交
328
    mError("failed to write file:%s since %s", curfile, tstrerror(code));
S
Shengliang Guan 已提交
329
  } else {
S
Shengliang Guan 已提交
330 331
    pSdb->lastCommitVer = pSdb->curVer;
    mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
S
Shengliang Guan 已提交
332 333
  }

S
Shengliang Guan 已提交
334
  terrno = code;
S
Shengliang Guan 已提交
335 336 337
  return code;
}

S
Shengliang Guan 已提交
338 339 340 341 342 343 344 345
int32_t sdbWriteFile(SSdb *pSdb) {
  if (pSdb->curVer == pSdb->lastCommitVer) {
    return 0;
  }

  return sdbWriteFileImp(pSdb);
}

S
Shengliang Guan 已提交
346
int32_t sdbDeploy(SSdb *pSdb) {
S
Shengliang Guan 已提交
347 348
  if (sdbRunDeployFp(pSdb) != 0) {
    return -1;
S
Shengliang Guan 已提交
349 350
  }

S
Shengliang Guan 已提交
351
  if (sdbWriteFileImp(pSdb) != 0) {
S
Shengliang Guan 已提交
352
    return -1;
S
Shengliang Guan 已提交
353 354 355 356
  }

  return 0;
}