sdb.c 9.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdbInt.h"
S
Shengliang Guan 已提交
18
#include "tglobal.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20
static SSdbMgr tsSdb = {0};
S
Shengliang Guan 已提交
21

S
Shengliang Guan 已提交
22 23
static int32_t sdbCreateDir() {
  if (!taosMkDir(tsSdb.currDir)) {
S
Shengliang Guan 已提交
24 25 26
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr());
    return -1;
S
Shengliang Guan 已提交
27
  }
S
Shengliang Guan 已提交
28

S
Shengliang Guan 已提交
29
  if (!taosMkDir(tsSdb.syncDir)) {
S
Shengliang Guan 已提交
30 31
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to create dir:%s since %s", tsSdb.syncDir, terrstr());
S
Shengliang Guan 已提交
32 33 34 35
    return -1;
  }

  if (!taosMkDir(tsSdb.tmpDir)) {
S
Shengliang Guan 已提交
36 37
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to create dir:%s since %s", tsSdb.tmpDir, terrstr());
S
Shengliang Guan 已提交
38 39 40 41 42 43 44
    return -1;
  }

  return 0;
}

static int32_t sdbRunDeployFp() {
S
Shengliang Guan 已提交
45 46
  for (int32_t i = SDB_START; i < SDB_MAX; ++i) {
    SdbDeployFp fp = tsSdb.deployFps[i];
S
Shengliang Guan 已提交
47 48 49 50
    if (fp == NULL) continue;
    if ((*fp)() != 0) {
      mError("failed to deploy sdb:%d since %s", i, terrstr());
      return -1;
S
Shengliang Guan 已提交
51 52 53 54 55 56
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
57 58
static SHashObj *sdbGetHash(int32_t sdb) {
  if (sdb >= SDB_MAX || sdb <= SDB_START) {
S
Shengliang Guan 已提交
59
    terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
S
Shengliang Guan 已提交
60
    return NULL;
S
Shengliang Guan 已提交
61 62
  }

S
Shengliang Guan 已提交
63 64
  SHashObj *hash = tsSdb.hashObjs[sdb];
  if (hash == NULL) {
S
Shengliang Guan 已提交
65
    terrno = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
66 67
    return NULL;
  }
S
Shengliang Guan 已提交
68

S
Shengliang Guan 已提交
69
  return hash;
S
Shengliang Guan 已提交
70 71
}

S
Shengliang Guan 已提交
72
int32_t sdbWrite(SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
73 74 75 76 77 78 79 80
  SHashObj *hash = sdbGetHash(pRaw->type);
  switch (pRaw->action) {
    case SDB_ACTION_INSERT:
      break;
    case SDB_ACTION_UPDATE:
      break;
    case SDB_ACTION_DELETE:
      break;
S
Shengliang Guan 已提交
81

S
Shengliang Guan 已提交
82 83
    default:
      break;
S
Shengliang Guan 已提交
84 85
  }

S
Shengliang Guan 已提交
86 87
  return 0;
}
S
Shengliang Guan 已提交
88

S
Shengliang Guan 已提交
89
static int32_t sdbWriteVersion(FileFd fd) { return 0; }
S
Shengliang Guan 已提交
90

S
Shengliang Guan 已提交
91
static int32_t sdbReadVersion(FileFd fd) { return 0; }
S
Shengliang Guan 已提交
92

S
Shengliang Guan 已提交
93 94
static int32_t sdbReadDataFile() {
  int32_t code = 0;
S
Shengliang Guan 已提交
95

S
Shengliang Guan 已提交
96
  SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
S
Shengliang Guan 已提交
97
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
98
    return TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111
  }

  char file[PATH_MAX] = {0};
  snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
  FileFd fd = taosOpenFileCreateWrite(file);
  if (fd <= 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for read since %s", file, tstrerror(code));
    return code;
  }

  int64_t offset = 0;
  while (1) {
S
Shengliang Guan 已提交
112
    int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw));
S
Shengliang Guan 已提交
113 114 115 116 117 118
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
S
Shengliang Guan 已提交
119 120
    }

S
Shengliang Guan 已提交
121
    if (ret < sizeof(SSdbRaw)) {
S
Shengliang Guan 已提交
122
      code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
123 124
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
S
Shengliang Guan 已提交
125 126
    }

S
Shengliang Guan 已提交
127 128 129
    code = sdbWrite(pRaw);
    if (code != 0) {
      mError("failed to read file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
130 131 132 133 134 135 136
      goto PARSE_SDB_DATA_ERROR;
    }
  }

  code = 0;

PARSE_SDB_DATA_ERROR:
S
Shengliang Guan 已提交
137
  taosCloseFile(fd);
S
Shengliang Guan 已提交
138 139 140 141
  return code;
}

static int32_t sdbWriteDataFile() {
S
Shengliang Guan 已提交
142 143 144 145 146 147
  int32_t code = 0;

  char tmpfile[PATH_MAX] = {0};
  snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir);

  FileFd fd = taosOpenFileCreateWrite(tmpfile);
S
Shengliang Guan 已提交
148
  if (fd <= 0) {
S
Shengliang Guan 已提交
149 150 151
    code = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code));
    return code;
S
Shengliang Guan 已提交
152 153
  }

S
Shengliang Guan 已提交
154
  for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
S
Shengliang Guan 已提交
155
    SHashObj *hash = tsSdb.hashObjs[i];
S
Shengliang Guan 已提交
156 157
    if (!hash) continue;

S
Shengliang Guan 已提交
158
    SdbEncodeFp encodeFp = tsSdb.encodeFps[i];
S
Shengliang Guan 已提交
159
    if (!encodeFp) continue;
S
Shengliang Guan 已提交
160

S
Shengliang Guan 已提交
161 162 163
    SSdbRow *pRow = taosHashIterate(hash, NULL);
    while (pRow != NULL) {
      if (pRow->status == SDB_STATUS_READY) continue;
S
Shengliang Guan 已提交
164
      SSdbRaw *pRaw = (*encodeFp)(pRow->pData);
S
Shengliang Guan 已提交
165
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
166
        taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
S
Shengliang Guan 已提交
167 168
      } else {
        taosHashCancelIterate(hash, pRow);
S
Shengliang Guan 已提交
169
        code = TSDB_CODE_SDB_APP_ERROR;
S
Shengliang Guan 已提交
170
        break;
S
Shengliang Guan 已提交
171 172
      }

S
Shengliang Guan 已提交
173
      pRow = taosHashIterate(hash, pRow);
S
Shengliang Guan 已提交
174 175 176
    }
  }

S
Shengliang Guan 已提交
177 178 179 180
  if (code == 0) {
    code = sdbWriteVersion(fd);
  }

S
Shengliang Guan 已提交
181 182
  taosCloseFile(fd);

S
Shengliang Guan 已提交
183 184 185
  if (code == 0) {
    code = taosFsyncFile(fd);
  }
S
Shengliang Guan 已提交
186 187

  if (code != 0) {
S
Shengliang Guan 已提交
188 189 190
    char curfile[PATH_MAX] = {0};
    snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir);
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
191 192
  }

S
Shengliang Guan 已提交
193 194 195 196 197 198 199
  if (code != 0) {
    mError("failed to write sdb file since %s", tstrerror(code));
  } else {
    mInfo("write sdb file successfully");
  }

  return code;
S
Shengliang Guan 已提交
200 201 202 203 204 205 206 207 208 209 210 211
}

int32_t sdbRead() {
  int32_t code = sdbReadDataFile();
  if (code != 0) {
    return code;
  }

  mInfo("read sdb file successfully");
  return -1;
}

S
Shengliang Guan 已提交
212 213 214 215 216 217 218 219 220
int32_t sdbCommit() {
  int32_t code = sdbWriteDataFile();
  if (code != 0) {
    return code;
  }

  return 0;
}

S
Shengliang Guan 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233
int32_t sdbDeploy() {
  if (sdbCreateDir() != 0) {
    return -1;
  }

  if (sdbRunDeployFp() != 0) {
    return -1;
  }

  if (sdbCommit() != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
234 235 236
  return 0;
}

S
Shengliang Guan 已提交
237 238 239
void sdbUnDeploy() {}

int32_t sdbInit() {
S
Shengliang Guan 已提交
240 241
  char path[PATH_MAX + 100];

S
Shengliang Guan 已提交
242
  snprintf(path, PATH_MAX + 100, "%s%scur%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
S
Shengliang Guan 已提交
243
  tsSdb.currDir = strdup(path);
S
Shengliang Guan 已提交
244

S
Shengliang Guan 已提交
245 246 247 248 249 250 251
  snprintf(path, PATH_MAX + 100, "%s%ssync%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
  tsSdb.syncDir = strdup(path);

  snprintf(path, PATH_MAX + 100, "%s%stmp%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
  tsSdb.tmpDir = strdup(path);

  if (tsSdb.currDir == NULL || tsSdb.currDir == NULL || tsSdb.currDir == NULL) {
S
Shengliang Guan 已提交
252
    return TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
253 254 255
  }

  for (int32_t i = 0; i < SDB_MAX; ++i) {
S
Shengliang Guan 已提交
256
    int32_t type;
S
Shengliang Guan 已提交
257
    if (tsSdb.keyTypes[i] == SDB_KEY_INT32) {
S
Shengliang Guan 已提交
258
      type = TSDB_DATA_TYPE_INT;
S
Shengliang Guan 已提交
259
    } else if (tsSdb.keyTypes[i] == SDB_KEY_INT64) {
S
Shengliang Guan 已提交
260 261 262 263 264 265 266
      type = TSDB_DATA_TYPE_BIGINT;
    } else {
      type = TSDB_DATA_TYPE_BINARY;
    }

    SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
    if (hash == NULL) {
S
Shengliang Guan 已提交
267
      return TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
268 269
    }

S
Shengliang Guan 已提交
270 271
    tsSdb.hashObjs[i] = hash;
    taosInitRWLatch(&tsSdb.locks[i]);
S
Shengliang Guan 已提交
272 273 274 275 276 277
  }

  return 0;
}

void sdbCleanup() {
S
Shengliang Guan 已提交
278 279
  if (tsSdb.curVer != tsSdb.lastCommitVer) {
    sdbCommit();
S
Shengliang Guan 已提交
280 281
  }

S
Shengliang Guan 已提交
282 283 284
  if (tsSdb.currDir != NULL) {
    tfree(tsSdb.currDir);
  }
S
Shengliang Guan 已提交
285

S
Shengliang Guan 已提交
286 287
  if (tsSdb.syncDir != NULL) {
    tfree(tsSdb.syncDir);
S
Shengliang Guan 已提交
288 289
  }

S
Shengliang Guan 已提交
290 291
  if (tsSdb.tmpDir != NULL) {
    tfree(tsSdb.tmpDir);
S
Shengliang Guan 已提交
292 293
  }

S
Shengliang Guan 已提交
294 295 296 297 298 299 300
  for (int32_t i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = tsSdb.hashObjs[i];
    if (hash != NULL) {
      taosHashCleanup(hash);
    }
    tsSdb.hashObjs[i] = NULL;
  }
S
Shengliang Guan 已提交
301 302
}

S
Shengliang Guan 已提交
303 304 305 306 307 308 309 310 311
void sdbSetTable(SSdbTable table) {
  ESdbType sdb = table.sdbType;
  tsSdb.keyTypes[sdb] = table.keyType;
  tsSdb.insertFps[sdb] = table.insertFp;
  tsSdb.updateFps[sdb] = table.updateFp;
  tsSdb.deleteFps[sdb] = table.deleteFp;
  tsSdb.deployFps[sdb] = table.deployFp;
  tsSdb.encodeFps[sdb] = table.encodeFp;
  tsSdb.decodeFps[sdb] = table.decodeFp;
S
Shengliang Guan 已提交
312 313 314 315
}

#if 0
void *sdbInsertRow(ESdbType sdb, void *p) {
S
Shengliang Guan 已提交
316 317
  SdbHead *pHead = p;
  pHead->type = sdb;
S
Shengliang Guan 已提交
318
  pHead->status = SDB_AVAIL;
S
Shengliang Guan 已提交
319

S
Shengliang Guan 已提交
320 321 322 323
  char    *pKey = (char *)pHead + sizeof(pHead);
  int32_t  keySize;
  EKeyType keyType = tsSdb.keyTypes[pHead->type];
  int32_t  dataSize = tsSdb.dataSize[pHead->type];
S
Shengliang Guan 已提交
324 325 326 327 328 329

  SHashObj *hash = sdbGetHash(pHead->type);
  if (hash == NULL) {
    return NULL;
  }

S
Shengliang Guan 已提交
330
  if (keyType == SDBINT32) {
S
Shengliang Guan 已提交
331
    keySize = sizeof(int32_t);
S
Shengliang Guan 已提交
332
  } else if (keyType == SDB_KEY_BINARY) {
S
Shengliang Guan 已提交
333 334 335 336 337 338 339 340 341
    keySize = strlen(pKey) + 1;
  } else {
    keySize = sizeof(int64_t);
  }

  taosHashPut(hash, pKey, keySize, pHead, dataSize);
  return taosHashGet(hash, pKey, keySize);
}

S
Shengliang Guan 已提交
342
void sdbDeleteRow(ESdbType sdb, void *p) {
S
Shengliang Guan 已提交
343
  SdbHead *pHead = p;
S
Shengliang Guan 已提交
344
  pHead->status = SDB_STATUS_DROPPED;
S
Shengliang Guan 已提交
345 346
}

S
Shengliang Guan 已提交
347 348 349 350 351 352
void *sdbUpdateRow(ESdbType sdb, void *pHead) { return sdbInsertRow(sdb, pHead); }

#endif

void *sdbAcquire(ESdbType sdb, void *pKey) {
  terrno = 0;
S
Shengliang Guan 已提交
353 354 355 356 357 358

  SHashObj *hash = sdbGetHash(sdb);
  if (hash == NULL) {
    return NULL;
  }

S
Shengliang Guan 已提交
359 360
  int32_t  keySize;
  EKeyType keyType = tsSdb.keyTypes[sdb];
S
Shengliang Guan 已提交
361

S
Shengliang Guan 已提交
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
  switch (keyType) {
    case SDB_KEY_INT32:
      keySize = sizeof(int32_t);
      break;
    case SDB_KEY_INT64:
      keySize = sizeof(int64_t);
      break;
    case SDB_KEY_BINARY:
      keySize = strlen(pKey) + 1;
      break;
    default:
      keySize = sizeof(int32_t);
  }

  SSdbRow *pRow = taosHashGet(hash, pKey, keySize);
  if (pRow == NULL) return NULL;

  if (pRow->status == SDB_STATUS_READY) {
    atomic_add_fetch_32(&pRow->refCount, 1);
S
Shengliang Guan 已提交
381
    return pRow->pData;
S
Shengliang Guan 已提交
382
  } else {
S
Shengliang Guan 已提交
383 384
    terrno = -1;  // todo
    return NULL;
S
Shengliang Guan 已提交
385
  }
S
Shengliang Guan 已提交
386
}
S
Shengliang Guan 已提交
387

S
Shengliang Guan 已提交
388
void sdbRelease(void *pObj) {
S
Shengliang Guan 已提交
389 390
  SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
  atomic_sub_fetch_32(&pRow->refCount, 1);
S
Shengliang Guan 已提交
391 392
}

S
Shengliang Guan 已提交
393
void *sdbFetchRow(ESdbType sdb, void *pIter) {
S
Shengliang Guan 已提交
394 395 396 397 398 399 400 401
  SHashObj *hash = sdbGetHash(sdb);
  if (hash == NULL) {
    return NULL;
  }

  return taosHashIterate(hash, pIter);
}

S
Shengliang Guan 已提交
402
void sdbCancelFetch(ESdbType sdb, void *pIter) {
S
Shengliang Guan 已提交
403 404 405 406 407 408 409
  SHashObj *hash = sdbGetHash(sdb);
  if (hash == NULL) {
    return;
  }
  taosHashCancelIterate(hash, pIter);
}

S
Shengliang Guan 已提交
410
int32_t sdbGetSize(ESdbType sdb) {
S
Shengliang Guan 已提交
411 412 413 414 415 416
  SHashObj *hash = sdbGetHash(sdb);
  if (hash == NULL) {
    return 0;
  }
  return taosHashGetSize(hash);
}