sdb.c 8.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdbInt.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static SSdbObj tsSdb = {0};
S
Shengliang Guan 已提交
20

S
Shengliang Guan 已提交
21 22 23 24 25
static int32_t sdbCreateDir() {
  if (!taosMkDir(tsSdb.currDir)) {
    mError("failed to create dir:%s", tsSdb.currDir);
    return TAOS_SYSTEM_ERROR(errno);
  }
S
Shengliang Guan 已提交
26

S
Shengliang Guan 已提交
27 28
  if (!taosMkDir(tsSdb.syncDir)) {
    mError("failed to create dir:%s", tsSdb.syncDir);
S
Shengliang Guan 已提交
29 30 31 32 33 34 35 36 37 38 39 40
    return -1;
  }

  if (!taosMkDir(tsSdb.tmpDir)) {
    mError("failed to create dir:%s", tsSdb.tmpDir);
    return -1;
  }

  return 0;
}

static int32_t sdbRunDeployFp() {
S
Shengliang Guan 已提交
41 42
  for (int32_t i = SDB_START; i < SDB_MAX; ++i) {
    SdbDeployFp fp = tsSdb.deployFps[i];
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50
    if (fp) {
      (*fp)();
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
51 52 53
static SHashObj *sdbGetHash(int32_t sdb) {
  if (sdb >= SDB_MAX || sdb <= SDB_START) {
    return NULL;
S
Shengliang Guan 已提交
54 55
  }

S
Shengliang Guan 已提交
56 57 58 59
  SHashObj *hash = tsSdb.hashObjs[sdb];
  if (hash == NULL) {
    return NULL;
  }
S
Shengliang Guan 已提交
60

S
Shengliang Guan 已提交
61
  return hash;
S
Shengliang Guan 已提交
62 63
}

S
Shengliang Guan 已提交
64
int32_t sdbWrite(SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
65 66 67 68 69 70 71 72
  SHashObj *hash = sdbGetHash(pRaw->type);
  switch (pRaw->action) {
    case SDB_ACTION_INSERT:
      break;
    case SDB_ACTION_UPDATE:
      break;
    case SDB_ACTION_DELETE:
      break;
S
Shengliang Guan 已提交
73

S
Shengliang Guan 已提交
74 75
    default:
      break;
S
Shengliang Guan 已提交
76 77
  }

S
Shengliang Guan 已提交
78 79
  return 0;
}
S
Shengliang Guan 已提交
80

S
Shengliang Guan 已提交
81
static int32_t sdbWriteVersion(FileFd fd) { return 0; }
S
Shengliang Guan 已提交
82

S
Shengliang Guan 已提交
83
static int32_t sdbReadVersion(FileFd fd) { return 0; }
S
Shengliang Guan 已提交
84

S
Shengliang Guan 已提交
85 86
static int32_t sdbReadDataFile() {
  int32_t code = 0;
S
Shengliang Guan 已提交
87

S
Shengliang Guan 已提交
88
  SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
S
Shengliang Guan 已提交
89
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
90
    return TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103
  }

  char file[PATH_MAX] = {0};
  snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
  FileFd fd = taosOpenFileCreateWrite(file);
  if (fd <= 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for read since %s", file, tstrerror(code));
    return code;
  }

  int64_t offset = 0;
  while (1) {
S
Shengliang Guan 已提交
104
    int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw));
S
Shengliang Guan 已提交
105 106 107 108 109 110
    if (ret == 0) break;

    if (ret < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
S
Shengliang Guan 已提交
111 112
    }

S
Shengliang Guan 已提交
113
    if (ret < sizeof(SSdbRaw)) {
S
Shengliang Guan 已提交
114 115 116
      code = TSDB_CODE_SDB_INTERNAL_ERROR;
      mError("failed to read file:%s since %s", file, tstrerror(code));
      break;
S
Shengliang Guan 已提交
117 118
    }

S
Shengliang Guan 已提交
119 120 121
    code = sdbWrite(pRaw);
    if (code != 0) {
      mError("failed to read file:%s since %s", file, tstrerror(code));
S
Shengliang Guan 已提交
122 123 124 125 126 127 128
      goto PARSE_SDB_DATA_ERROR;
    }
  }

  code = 0;

PARSE_SDB_DATA_ERROR:
S
Shengliang Guan 已提交
129
  taosCloseFile(fd);
S
Shengliang Guan 已提交
130 131 132 133
  return code;
}

static int32_t sdbWriteDataFile() {
S
Shengliang Guan 已提交
134 135 136 137 138 139
  int32_t code = 0;

  char tmpfile[PATH_MAX] = {0};
  snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir);

  FileFd fd = taosOpenFileCreateWrite(tmpfile);
S
Shengliang Guan 已提交
140
  if (fd <= 0) {
S
Shengliang Guan 已提交
141 142 143
    code = TAOS_SYSTEM_ERROR(errno);
    mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code));
    return code;
S
Shengliang Guan 已提交
144 145
  }

S
Shengliang Guan 已提交
146
  for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
S
Shengliang Guan 已提交
147
    SHashObj *hash = tsSdb.hashObjs[i];
S
Shengliang Guan 已提交
148 149
    if (!hash) continue;

S
Shengliang Guan 已提交
150
    SdbEncodeFp encodeFp = tsSdb.encodeFps[i];
S
Shengliang Guan 已提交
151
    if (!encodeFp) continue;
S
Shengliang Guan 已提交
152

S
Shengliang Guan 已提交
153 154 155
    SSdbRow *pRow = taosHashIterate(hash, NULL);
    while (pRow != NULL) {
      if (pRow->status == SDB_STATUS_READY) continue;
S
Shengliang Guan 已提交
156
      SSdbRaw *pRaw = (*encodeFp)(pRow->data);
S
Shengliang Guan 已提交
157
      if (pRaw != NULL) {
S
Shengliang Guan 已提交
158
        taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
S
Shengliang Guan 已提交
159 160 161 162
      } else {
        taosHashCancelIterate(hash, pRow);
        code = TSDB_CODE_SDB_INTERNAL_ERROR;
        break;
S
Shengliang Guan 已提交
163 164
      }

S
Shengliang Guan 已提交
165
      pRow = taosHashIterate(hash, pRow);
S
Shengliang Guan 已提交
166 167 168
    }
  }

S
Shengliang Guan 已提交
169 170 171 172
  if (code == 0) {
    code = sdbWriteVersion(fd);
  }

S
Shengliang Guan 已提交
173 174
  taosCloseFile(fd);

S
Shengliang Guan 已提交
175 176 177
  if (code == 0) {
    code = taosFsyncFile(fd);
  }
S
Shengliang Guan 已提交
178 179

  if (code != 0) {
S
Shengliang Guan 已提交
180 181 182
    char curfile[PATH_MAX] = {0};
    snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir);
    code = taosRenameFile(tmpfile, curfile);
S
Shengliang Guan 已提交
183 184
  }

S
Shengliang Guan 已提交
185 186 187 188 189 190 191
  if (code != 0) {
    mError("failed to write sdb file since %s", tstrerror(code));
  } else {
    mInfo("write sdb file successfully");
  }

  return code;
S
Shengliang Guan 已提交
192 193 194 195 196 197 198 199 200 201 202 203
}

int32_t sdbRead() {
  int32_t code = sdbReadDataFile();
  if (code != 0) {
    return code;
  }

  mInfo("read sdb file successfully");
  return -1;
}

S
Shengliang Guan 已提交
204 205 206 207 208 209 210 211 212
int32_t sdbCommit() {
  int32_t code = sdbWriteDataFile();
  if (code != 0) {
    return code;
  }

  return 0;
}

S
Shengliang Guan 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225
int32_t sdbDeploy() {
  if (sdbCreateDir() != 0) {
    return -1;
  }

  if (sdbRunDeployFp() != 0) {
    return -1;
  }

  if (sdbCommit() != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
226 227 228
  return 0;
}

S
Shengliang Guan 已提交
229 230 231
void sdbUnDeploy() {}

int32_t sdbInit() {
S
Shengliang Guan 已提交
232 233 234 235
  char path[PATH_MAX + 100];

  snprintf(path, PATH_MAX + 100, "%s%scurrent%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
  tsSdb.currDir = strdup(path);
S
Shengliang Guan 已提交
236

S
Shengliang Guan 已提交
237 238 239 240 241 242 243
  snprintf(path, PATH_MAX + 100, "%s%ssync%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
  tsSdb.syncDir = strdup(path);

  snprintf(path, PATH_MAX + 100, "%s%stmp%s", tsMnodeDir, TD_DIRSEP, TD_DIRSEP);
  tsSdb.tmpDir = strdup(path);

  if (tsSdb.currDir == NULL || tsSdb.currDir == NULL || tsSdb.currDir == NULL) {
S
Shengliang Guan 已提交
244
    return TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
245 246 247
  }

  for (int32_t i = 0; i < SDB_MAX; ++i) {
S
Shengliang Guan 已提交
248
    int32_t type;
S
Shengliang Guan 已提交
249
    if (tsSdb.keyTypes[i] == SDB_KEY_INT32) {
S
Shengliang Guan 已提交
250
      type = TSDB_DATA_TYPE_INT;
S
Shengliang Guan 已提交
251
    } else if (tsSdb.keyTypes[i] == SDB_KEY_INT64) {
S
Shengliang Guan 已提交
252 253 254 255 256 257 258
      type = TSDB_DATA_TYPE_BIGINT;
    } else {
      type = TSDB_DATA_TYPE_BINARY;
    }

    SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
    if (hash == NULL) {
S
Shengliang Guan 已提交
259
      return TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
260 261
    }

S
Shengliang Guan 已提交
262 263
    tsSdb.hashObjs[i] = hash;
    taosInitRWLatch(&tsSdb.locks[i]);
S
Shengliang Guan 已提交
264 265 266 267 268 269
  }

  return 0;
}

void sdbCleanup() {
S
Shengliang Guan 已提交
270 271
  if (tsSdb.curVer != tsSdb.lastCommitVer) {
    sdbCommit();
S
Shengliang Guan 已提交
272 273
  }

S
Shengliang Guan 已提交
274 275 276
  if (tsSdb.currDir != NULL) {
    tfree(tsSdb.currDir);
  }
S
Shengliang Guan 已提交
277

S
Shengliang Guan 已提交
278 279
  if (tsSdb.syncDir != NULL) {
    tfree(tsSdb.syncDir);
S
Shengliang Guan 已提交
280 281
  }

S
Shengliang Guan 已提交
282 283
  if (tsSdb.tmpDir != NULL) {
    tfree(tsSdb.tmpDir);
S
Shengliang Guan 已提交
284 285
  }

S
Shengliang Guan 已提交
286 287 288 289 290 291 292
  for (int32_t i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = tsSdb.hashObjs[i];
    if (hash != NULL) {
      taosHashCleanup(hash);
    }
    tsSdb.hashObjs[i] = NULL;
  }
S
Shengliang Guan 已提交
293 294
}

S
Shengliang Guan 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307
void sdbSetHandler(SSdbDesc desc) {
  ESdbType sdb = desc.sdbType;
  tsSdb.keyTypes[sdb] = desc.keyType;
  tsSdb.insertFps[sdb] = desc.insertFp;
  tsSdb.updateFps[sdb] = desc.updateFp;
  tsSdb.deleteFps[sdb] = desc.deleteFp;
  tsSdb.deployFps[sdb] = desc.deployFp;
  tsSdb.encodeFps[sdb] = desc.encodeFp;
  tsSdb.decodeFps[sdb] = desc.decodeFp;
}

#if 0
void *sdbInsertRow(ESdbType sdb, void *p) {
S
Shengliang Guan 已提交
308 309
  SdbHead *pHead = p;
  pHead->type = sdb;
S
Shengliang Guan 已提交
310
  pHead->status = SDB_AVAIL;
S
Shengliang Guan 已提交
311

S
Shengliang Guan 已提交
312 313 314 315
  char    *pKey = (char *)pHead + sizeof(pHead);
  int32_t  keySize;
  EKeyType keyType = tsSdb.keyTypes[pHead->type];
  int32_t  dataSize = tsSdb.dataSize[pHead->type];
S
Shengliang Guan 已提交
316 317 318 319 320 321

  SHashObj *hash = sdbGetHash(pHead->type);
  if (hash == NULL) {
    return NULL;
  }

S
Shengliang Guan 已提交
322
  if (keyType == SDBINT32) {
S
Shengliang Guan 已提交
323
    keySize = sizeof(int32_t);
S
Shengliang Guan 已提交
324
  } else if (keyType == SDB_KEY_BINARY) {
S
Shengliang Guan 已提交
325 326 327 328 329 330 331 332 333
    keySize = strlen(pKey) + 1;
  } else {
    keySize = sizeof(int64_t);
  }

  taosHashPut(hash, pKey, keySize, pHead, dataSize);
  return taosHashGet(hash, pKey, keySize);
}

S
Shengliang Guan 已提交
334
void sdbDeleteRow(ESdbType sdb, void *p) {
S
Shengliang Guan 已提交
335
  SdbHead *pHead = p;
S
Shengliang Guan 已提交
336
  pHead->status = SDB_STATUS_DROPPED;
S
Shengliang Guan 已提交
337 338
}

S
Shengliang Guan 已提交
339 340 341 342 343 344
void *sdbUpdateRow(ESdbType sdb, void *pHead) { return sdbInsertRow(sdb, pHead); }

#endif

void *sdbAcquire(ESdbType sdb, void *pKey) {
  terrno = 0;
S
Shengliang Guan 已提交
345 346 347 348 349 350

  SHashObj *hash = sdbGetHash(sdb);
  if (hash == NULL) {
    return NULL;
  }

S
Shengliang Guan 已提交
351 352
  int32_t  keySize;
  EKeyType keyType = tsSdb.keyTypes[sdb];
S
Shengliang Guan 已提交
353

S
Shengliang Guan 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
  switch (keyType) {
    case SDB_KEY_INT32:
      keySize = sizeof(int32_t);
      break;
    case SDB_KEY_INT64:
      keySize = sizeof(int64_t);
      break;
    case SDB_KEY_BINARY:
      keySize = strlen(pKey) + 1;
      break;
    default:
      keySize = sizeof(int32_t);
  }

  SSdbRow *pRow = taosHashGet(hash, pKey, keySize);
  if (pRow == NULL) return NULL;

  if (pRow->status == SDB_STATUS_READY) {
    atomic_add_fetch_32(&pRow->refCount, 1);
    return pRow->data;
S
Shengliang Guan 已提交
374
  } else {
S
Shengliang Guan 已提交
375 376
    terrno = -1;  // todo
    return NULL;
S
Shengliang Guan 已提交
377
  }
S
Shengliang Guan 已提交
378
}
S
Shengliang Guan 已提交
379

S
Shengliang Guan 已提交
380
void sdbRelease(void *pObj) {
S
Shengliang Guan 已提交
381 382
  SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
  atomic_sub_fetch_32(&pRow->refCount, 1);
S
Shengliang Guan 已提交
383 384
}

S
Shengliang Guan 已提交
385
void *sdbFetchRow(ESdbType sdb, void *pIter) {
S
Shengliang Guan 已提交
386 387 388 389 390 391 392 393
  SHashObj *hash = sdbGetHash(sdb);
  if (hash == NULL) {
    return NULL;
  }

  return taosHashIterate(hash, pIter);
}

S
Shengliang Guan 已提交
394
void sdbCancelFetch(ESdbType sdb, void *pIter) {
S
Shengliang Guan 已提交
395 396 397 398 399 400 401
  SHashObj *hash = sdbGetHash(sdb);
  if (hash == NULL) {
    return;
  }
  taosHashCancelIterate(hash, pIter);
}

S
Shengliang Guan 已提交
402
int32_t sdbGetSize(ESdbType sdb) {
S
Shengliang Guan 已提交
403 404 405 406 407 408
  SHashObj *hash = sdbGetHash(sdb);
  if (hash == NULL) {
    return 0;
  }
  return taosHashGetSize(hash);
}