sdb.c 5.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "sdb.h"
S
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20
static int32_t sdbCreateDir(SSdb *pSdb);

S
Shengliang Guan 已提交
21
SSdb *sdbInit(SSdbOpt *pOption) {
22
  mInfo("start to init sdb in %s", pOption->path);
S
Shengliang Guan 已提交
23

wafwerar's avatar
wafwerar 已提交
24
  SSdb *pSdb = taosMemoryCalloc(1, sizeof(SSdb));
25 26
  if (pSdb == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
27
    mError("failed to init sdb since %s", terrstr());
28 29
    return NULL;
  }
S
Shengliang Guan 已提交
30

S
Shengliang Guan 已提交
31 32
  char path[PATH_MAX + 100] = {0};
  snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP);
33
  pSdb->currDir = strdup(path);
S
Shengliang Guan 已提交
34
  snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP);
35
  pSdb->tmpDir = strdup(path);
36
  if (pSdb->currDir == NULL || pSdb->tmpDir == NULL) {
S
Shengliang Guan 已提交
37
    sdbCleanup(pSdb);
38
    terrno = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
39
    mError("failed to init sdb since %s", terrstr());
40
    return NULL;
S
Shengliang Guan 已提交
41 42
  }

S
Shengliang Guan 已提交
43 44 45 46 47
  if (sdbCreateDir(pSdb) != 0) {
    sdbCleanup(pSdb);
    return NULL;
  }

48
  for (ESdbType i = 0; i < SDB_MAX; ++i) {
49
    taosThreadRwlockInit(&pSdb->locks[i], NULL);
S
Shengliang Guan 已提交
50
    pSdb->maxId[i] = 0;
S
Shengliang Guan 已提交
51
    pSdb->tableVer[i] = 0;
S
Shengliang Guan 已提交
52
    pSdb->keyTypes[i] = SDB_KEY_INT32;
S
Shengliang Guan 已提交
53 54
  }

55
  pSdb->pWal = pOption->pWal;
S
Shengliang Guan 已提交
56 57 58 59 60 61
  pSdb->applyIndex = -1;
  pSdb->applyTerm = -1;
  pSdb->applyConfig = -1;
  pSdb->commitIndex = -1;
  pSdb->commitTerm = -1;
  pSdb->commitConfig = -1;
S
Shengliang Guan 已提交
62
  pSdb->pMnode = pOption->pMnode;
S
Shengliang Guan 已提交
63
  taosThreadMutexInit(&pSdb->filelock, NULL);
64
  mInfo("sdb init success");
S
Shengliang Guan 已提交
65
  return pSdb;
S
Shengliang Guan 已提交
66 67
}

S
Shengliang Guan 已提交
68
void sdbCleanup(SSdb *pSdb) {
69
  mInfo("start to cleanup sdb");
S
Shengliang Guan 已提交
70

71
  sdbWriteFile(pSdb, 0);
S
Shengliang Guan 已提交
72

73
  if (pSdb->currDir != NULL) {
wafwerar's avatar
wafwerar 已提交
74
    taosMemoryFreeClear(pSdb->currDir);
S
Shengliang Guan 已提交
75
  }
S
Shengliang Guan 已提交
76

77
  if (pSdb->tmpDir != NULL) {
S
Shengliang Guan 已提交
78
    taosRemoveDir(pSdb->tmpDir);
wafwerar's avatar
wafwerar 已提交
79
    taosMemoryFreeClear(pSdb->tmpDir);
S
Shengliang Guan 已提交
80 81
  }

82
  for (ESdbType i = 0; i < SDB_MAX; ++i) {
83
    SHashObj *hash = pSdb->hashObjs[i];
84 85
    if (hash == NULL) continue;

S
Shengliang Guan 已提交
86
    SSdbRow **ppRow = taosHashIterate(hash, NULL);
87 88 89 90
    while (ppRow != NULL) {
      SSdbRow *pRow = *ppRow;
      if (pRow == NULL) continue;

91
      sdbFreeRow(pSdb, pRow, true);
92
      ppRow = taosHashIterate(hash, ppRow);
S
Shengliang Guan 已提交
93
    }
94 95 96 97 98 99 100 101
  }

  for (ESdbType i = 0; i < SDB_MAX; ++i) {
    SHashObj *hash = pSdb->hashObjs[i];
    if (hash == NULL) continue;

    taosHashClear(hash);
    taosHashCleanup(hash);
102
    taosThreadRwlockDestroy(&pSdb->locks[i]);
103
    pSdb->hashObjs[i] = NULL;
104 105
    memset(&pSdb->locks[i], 0, sizeof(pSdb->locks[i]));

106
    mInfo("sdb table:%s is cleaned up", sdbTableName(i));
S
Shengliang Guan 已提交
107
  }
S
Shengliang Guan 已提交
108

S
Shengliang Guan 已提交
109
  taosThreadMutexDestroy(&pSdb->filelock);
wafwerar's avatar
wafwerar 已提交
110
  taosMemoryFree(pSdb);
111
  mInfo("sdb is cleaned up");
S
Shengliang Guan 已提交
112 113
}

S
Shengliang Guan 已提交
114
int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
S
Shengliang Guan 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
  ESdbType sdbType = table.sdbType;
  EKeyType keyType = table.keyType;
  pSdb->keyTypes[sdbType] = table.keyType;
  pSdb->insertFps[sdbType] = table.insertFp;
  pSdb->updateFps[sdbType] = table.updateFp;
  pSdb->deleteFps[sdbType] = table.deleteFp;
  pSdb->deployFps[sdbType] = table.deployFp;
  pSdb->encodeFps[sdbType] = table.encodeFp;
  pSdb->decodeFps[sdbType] = table.decodeFp;

  int32_t hashType = 0;
  if (keyType == SDB_KEY_INT32) {
    hashType = TSDB_DATA_TYPE_INT;
  } else if (keyType == SDB_KEY_INT64) {
    hashType = TSDB_DATA_TYPE_BIGINT;
  } else {
    hashType = TSDB_DATA_TYPE_BINARY;
  }
S
Shengliang Guan 已提交
133

S
Shengliang Guan 已提交
134
  SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(hashType), true, HASH_ENTRY_LOCK);
S
Shengliang Guan 已提交
135 136 137
  if (hash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
138 139
  }

S
Shengliang Guan 已提交
140
  pSdb->maxId[sdbType] = 0;
S
Shengliang Guan 已提交
141
  pSdb->hashObjs[sdbType] = hash;
142
  mInfo("sdb table:%s is initialized", sdbTableName(sdbType));
S
Shengliang Guan 已提交
143

S
Shengliang Guan 已提交
144
  return 0;
S
Shengliang Guan 已提交
145 146 147
}

static int32_t sdbCreateDir(SSdb *pSdb) {
S
Shengliang Guan 已提交
148
  if (taosMulMkDir(pSdb->currDir) != 0) {
S
Shengliang Guan 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to create dir:%s since %s", pSdb->currDir, terrstr());
    return -1;
  }

  if (taosMkDir(pSdb->tmpDir) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
    return -1;
  }

  return 0;
}
S
Shengliang Guan 已提交
162

163
void sdbSetApplyInfo(SSdb *pSdb, int64_t index, int64_t term, int64_t config) {
164 165
#if 1
  mTrace("mnode apply info changed from index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " to index:%" PRId64
166 167
         " term:%" PRId64 " config:%" PRId64,
         pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, index, term, config);
168
#endif
169
  pSdb->applyIndex = index;
S
Shengliang Guan 已提交
170
  pSdb->applyTerm = term;
171
  pSdb->applyConfig = config;
S
Shengliang Guan 已提交
172
}
S
Shengliang Guan 已提交
173

174 175 176 177
void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config) {
  *index = pSdb->commitIndex;
  *term = pSdb->commitTerm;
  *config = pSdb->commitConfig;
178
#if 1
179 180 181
  mTrace("mnode current info, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
         " term:%" PRId64 " config:%" PRId64,
         pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config);
182
#endif
183
}
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203

void sdbWriteLock(SSdb *pSdb, int32_t type) {
  TdThreadRwlock *pLock = &pSdb->locks[type];
  // mTrace("sdb table:%d start write lock:%p", type, pLock);
  taosThreadRwlockWrlock(pLock);
  // mTrace("sdb table:%d stop write lock:%p", type, pLock);
}

void sdbReadLock(SSdb *pSdb, int32_t type) {
  TdThreadRwlock *pLock = &pSdb->locks[type];
  // mTrace("sdb table:%d start read lock:%p", type, pLock);
  taosThreadRwlockRdlock(pLock);
  // mTrace("sdb table:%d stop read lock:%p", type, pLock);
}

void sdbUnLock(SSdb *pSdb, int32_t type) {
  TdThreadRwlock *pLock = &pSdb->locks[type];
  // mTrace("sdb table:%d unlock:%p", type, pLock);
  taosThreadRwlockUnlock(pLock);
}