sdbHash.c 8.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"

S
Shengliang Guan 已提交
19 20
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
  if (type >= SDB_MAX || type <= SDB_START) {
S
Shengliang Guan 已提交
21 22 23 24
    terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
    return NULL;
  }

S
Shengliang Guan 已提交
25
  SHashObj *hash = pSdb->hashObjs[type];
S
Shengliang Guan 已提交
26 27 28 29 30 31 32 33
  if (hash == NULL) {
    terrno = TSDB_CODE_SDB_APP_ERROR;
    return NULL;
  }

  return hash;
}

S
Shengliang Guan 已提交
34
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
S
Shengliang Guan 已提交
35
  int32_t  keySize;
S
Shengliang Guan 已提交
36
  EKeyType keyType = pSdb->keyTypes[type];
S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48

  if (keyType == SDB_KEY_INT32) {
    keySize = sizeof(int32_t);
  } else if (keyType == SDB_KEY_BINARY) {
    keySize = strlen(pKey) + 1;
  } else {
    keySize = sizeof(int64_t);
  }

  return keySize;
}

S
Shengliang Guan 已提交
49 50 51
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
  int32_t code = 0;

S
Shengliang Guan 已提交
52
  SRWLatch *pLock = &pSdb->locks[pRow->type];
S
Shengliang Guan 已提交
53 54
  taosWLockLatch(pLock);

S
Shengliang Guan 已提交
55 56
  SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
  if (pOldRow != NULL) {
S
Shengliang Guan 已提交
57
    taosWUnLockLatch(pLock);
58
    sdbFreeRow(pRow);
S
Shengliang Guan 已提交
59 60
    terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
    return terrno;
S
Shengliang Guan 已提交
61 62
  }

S
Shengliang Guan 已提交
63
  pRow->refCount = 1;
S
Shengliang Guan 已提交
64 65 66
  pRow->status = pRaw->status;

  if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
S
Shengliang Guan 已提交
67
    taosWUnLockLatch(pLock);
68
    sdbFreeRow(pRow);
S
Shengliang Guan 已提交
69 70
    terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
    return terrno;
S
Shengliang Guan 已提交
71 72
  }

S
Shengliang Guan 已提交
73 74
  taosWUnLockLatch(pLock);

S
Shengliang Guan 已提交
75 76 77 78
  if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
    pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
  }

S
Shengliang Guan 已提交
79
  SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
S
Shengliang Guan 已提交
80
  if (insertFp != NULL) {
S
Shengliang Guan 已提交
81 82
    code = (*insertFp)(pSdb, pRow->pObj);
    if (code != 0) {
S
Shengliang Guan 已提交
83
      taosWLockLatch(pLock);
S
Shengliang Guan 已提交
84
      taosHashRemove(hash, pRow->pObj, keySize);
S
Shengliang Guan 已提交
85
      taosWUnLockLatch(pLock);
86
      sdbFreeRow(pRow);
S
Shengliang Guan 已提交
87 88
      terrno = code;
      return terrno;
S
Shengliang Guan 已提交
89 90 91 92 93 94
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
95
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
S
Shengliang Guan 已提交
96 97
  int32_t code = 0;

S
Shengliang Guan 已提交
98
  SRWLatch *pLock = &pSdb->locks[pNewRow->type];
S
Shengliang Guan 已提交
99 100
  taosRLockLatch(pLock);

S
Shengliang Guan 已提交
101 102
  SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
  if (ppOldRow == NULL || *ppOldRow == NULL) {
S
Shengliang Guan 已提交
103
    taosRUnLockLatch(pLock);
S
Shengliang Guan 已提交
104
    return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
S
Shengliang Guan 已提交
105
  }
S
Shengliang Guan 已提交
106
  SSdbRow *pOldRow = *ppOldRow;
S
Shengliang Guan 已提交
107

S
Shengliang Guan 已提交
108
  pOldRow->status = pRaw->status;
S
Shengliang Guan 已提交
109 110
  taosRUnLockLatch(pLock);

S
Shengliang Guan 已提交
111
  SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
S
Shengliang Guan 已提交
112
  if (updateFp != NULL) {
S
Shengliang Guan 已提交
113
    code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
S
Shengliang Guan 已提交
114 115
  }

S
Shengliang Guan 已提交
116
  sdbFreeRow(pNewRow);
S
Shengliang Guan 已提交
117
  return code;
S
Shengliang Guan 已提交
118 119
}

S
Shengliang Guan 已提交
120 121 122
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
  int32_t code = 0;

S
Shengliang Guan 已提交
123
  SRWLatch *pLock = &pSdb->locks[pRow->type];
S
Shengliang Guan 已提交
124 125
  taosWLockLatch(pLock);

S
Shengliang Guan 已提交
126 127
  SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
  if (ppOldRow == NULL || *ppOldRow == NULL) {
S
Shengliang Guan 已提交
128
    taosWUnLockLatch(pLock);
129
    sdbFreeRow(pRow);
S
Shengliang Guan 已提交
130 131
    terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
    return terrno;
S
Shengliang Guan 已提交
132
  }
S
Shengliang Guan 已提交
133
  SSdbRow *pOldRow = *ppOldRow;
S
Shengliang Guan 已提交
134

S
Shengliang Guan 已提交
135 136
  pOldRow->status = pRaw->status;
  taosHashRemove(hash, pOldRow->pObj, keySize);
S
Shengliang Guan 已提交
137 138
  taosWUnLockLatch(pLock);

S
Shengliang Guan 已提交
139
  sdbRelease(pSdb, pOldRow->pObj);
140
  sdbFreeRow(pRow);
S
Shengliang Guan 已提交
141
  return code;
S
Shengliang Guan 已提交
142 143
}

S
Shengliang Guan 已提交
144
int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
145
  SHashObj *hash = sdbGetHash(pSdb, pRaw->type);
S
Shengliang Guan 已提交
146
  if (hash == NULL) return terrno;
S
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  SdbDecodeFp decodeFp = pSdb->decodeFps[pRaw->type];
S
Shengliang Guan 已提交
149 150
  SSdbRow    *pRow = (*decodeFp)(pRaw);
  if (pRow == NULL) {
S
Shengliang Guan 已提交
151
    return terrno;
S
Shengliang Guan 已提交
152 153
  }

S
Shengliang Guan 已提交
154
  pRow->type = pRaw->type;
S
Shengliang Guan 已提交
155

S
Shengliang Guan 已提交
156
  int32_t keySize = sdbGetkeySize(pSdb, pRow->type, pRow->pObj);
S
Shengliang Guan 已提交
157
  int32_t code = TSDB_CODE_SDB_INVALID_ACTION_TYPE;
S
Shengliang Guan 已提交
158 159 160

  switch (pRaw->status) {
    case SDB_STATUS_CREATING:
S
Shengliang Guan 已提交
161
      code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
S
Shengliang Guan 已提交
162
      break;
S
Shengliang Guan 已提交
163
    case SDB_STATUS_UPDATING:
S
Shengliang Guan 已提交
164
    case SDB_STATUS_READY:
S
Shengliang Guan 已提交
165
    case SDB_STATUS_DROPPING:
S
Shengliang Guan 已提交
166
      code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
S
Shengliang Guan 已提交
167 168
      break;
    case SDB_STATUS_DROPPED:
S
Shengliang Guan 已提交
169
      code = sdbDeleteRow(pSdb, hash, pRaw, pRow, keySize);
S
Shengliang Guan 已提交
170
      break;
S
Shengliang Guan 已提交
171 172
  }

173
  return code;
S
Shengliang Guan 已提交
174 175
}

S
Shengliang Guan 已提交
176
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
177
  int32_t code = sdbWriteNotFree(pSdb, pRaw);
S
Shengliang Guan 已提交
178 179 180 181
  sdbFreeRaw(pRaw);
  return code;
}

S
Shengliang Guan 已提交
182 183
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
  SHashObj *hash = sdbGetHash(pSdb, type);
S
Shengliang Guan 已提交
184 185 186
  if (hash == NULL) return NULL;

  void   *pRet = NULL;
S
Shengliang Guan 已提交
187
  int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
S
Shengliang Guan 已提交
188

S
Shengliang Guan 已提交
189
  SRWLatch *pLock = &pSdb->locks[type];
S
Shengliang Guan 已提交
190 191 192
  taosRLockLatch(pLock);

  SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
S
Shengliang Guan 已提交
193
  if (ppRow == NULL || *ppRow == NULL) {
S
Shengliang Guan 已提交
194
    taosRUnLockLatch(pLock);
S
Shengliang Guan 已提交
195
    terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
S
Shengliang Guan 已提交
196 197 198
    return NULL;
  }

S
Shengliang Guan 已提交
199 200 201 202 203
  SSdbRow *pRow = *ppRow;
  switch (pRow->status) {
    case SDB_STATUS_READY:
      atomic_add_fetch_32(&pRow->refCount, 1);
      pRet = pRow->pObj;
S
Shengliang Guan 已提交
204
      break;
S
Shengliang Guan 已提交
205 206
    case SDB_STATUS_CREATING:
      terrno = TSDB_CODE_SDB_OBJ_CREATING;
S
Shengliang Guan 已提交
207
      break;
S
Shengliang Guan 已提交
208
    case SDB_STATUS_DROPPING:
S
Shengliang Guan 已提交
209
      terrno = TSDB_CODE_SDB_OBJ_DROPPING;
S
Shengliang Guan 已提交
210 211
      break;
    default:
S
Shengliang Guan 已提交
212 213
      terrno = TSDB_CODE_SDB_APP_ERROR;
      break;
S
Shengliang Guan 已提交
214 215
  }

S
Shengliang Guan 已提交
216 217
  taosRUnLockLatch(pLock);
  return pRet;
S
Shengliang Guan 已提交
218 219
}

S
Shengliang Guan 已提交
220
void sdbRelease(SSdb *pSdb, void *pObj) {
S
Shengliang Guan 已提交
221 222
  if (pObj == NULL) return;

S
Shengliang Guan 已提交
223
  SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
S
Shengliang Guan 已提交
224
  if (pRow->type >= SDB_MAX || pRow->type <= SDB_START) return;
S
Shengliang Guan 已提交
225

S
Shengliang Guan 已提交
226
  SRWLatch *pLock = &pSdb->locks[pRow->type];
S
Shengliang Guan 已提交
227 228 229 230
  taosRLockLatch(pLock);

  int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
  if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
S
Shengliang Guan 已提交
231 232 233 234 235
    SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
    if (deleteFp != NULL) {
      (*deleteFp)(pSdb, pRow->pObj);
    }

S
Shengliang Guan 已提交
236 237 238 239
    sdbFreeRow(pRow);
  }

  taosRUnLockLatch(pLock);
S
Shengliang Guan 已提交
240 241
}

S
Shengliang Guan 已提交
242
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
243 244
  *ppObj = NULL;

S
Shengliang Guan 已提交
245
  SHashObj *hash = sdbGetHash(pSdb, type);
S
Shengliang Guan 已提交
246
  if (hash == NULL) return NULL;
S
Shengliang Guan 已提交
247

S
Shengliang Guan 已提交
248
  SRWLatch *pLock = &pSdb->locks[type];
S
Shengliang Guan 已提交
249
  taosRLockLatch(pLock);
S
Shengliang Guan 已提交
250

S
Shengliang Guan 已提交
251 252 253 254 255 256 257 258 259
  if (pIter != NULL) {
    SSdbRow *pLastRow = *(SSdbRow **)pIter;
    int32_t  ref = atomic_sub_fetch_32(&pLastRow->refCount, 1);
    if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) {
      sdbFreeRow(pLastRow);
    }
  }

  SSdbRow **ppRow = taosHashIterate(hash, pIter);
S
Shengliang Guan 已提交
260 261 262 263 264 265 266 267 268 269 270
  while (ppRow != NULL) {
    SSdbRow *pRow = *ppRow;
    if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
      ppRow = taosHashIterate(hash, ppRow);
      continue;
    }

    atomic_add_fetch_32(&pRow->refCount, 1);
    *ppObj = pRow->pObj;
    break;
  }
S
Shengliang Guan 已提交
271 272
  taosRUnLockLatch(pLock);

S
Shengliang Guan 已提交
273
  return ppRow;
S
Shengliang Guan 已提交
274 275
}

S
Shengliang Guan 已提交
276
void sdbCancelFetch(SSdb *pSdb, void *pIter) {
S
Shengliang Guan 已提交
277 278
  if (pIter == NULL) return;
  SSdbRow  *pRow = *(SSdbRow **)pIter;
S
Shengliang Guan 已提交
279
  SHashObj *hash = sdbGetHash(pSdb, pRow->type);
S
Shengliang Guan 已提交
280
  if (hash == NULL) return;
S
Shengliang Guan 已提交
281

S
Shengliang Guan 已提交
282
  SRWLatch *pLock = &pSdb->locks[pRow->type];
S
Shengliang Guan 已提交
283
  taosRLockLatch(pLock);
S
Shengliang Guan 已提交
284
  taosHashCancelIterate(hash, pIter);
S
Shengliang Guan 已提交
285
  taosRUnLockLatch(pLock);
S
Shengliang Guan 已提交
286 287
}

S
Shengliang Guan 已提交
288 289
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
  SHashObj *hash = sdbGetHash(pSdb, type);
S
Shengliang Guan 已提交
290
  if (hash == NULL) return 0;
S
Shengliang Guan 已提交
291

S
Shengliang Guan 已提交
292
  SRWLatch *pLock = &pSdb->locks[type];
S
Shengliang Guan 已提交
293 294 295 296 297
  taosRLockLatch(pLock);
  int32_t size = taosHashGetSize(hash);
  taosRUnLockLatch(pLock);

  return size;
S
Shengliang Guan 已提交
298
}
S
Shengliang Guan 已提交
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323

int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
  SHashObj *hash = sdbGetHash(pSdb, type);
  if (hash == NULL) return -1;

  if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1;

  int32_t maxId = 0;

  SRWLatch *pLock = &pSdb->locks[type];
  taosRLockLatch(pLock);

  SSdbRow **ppRow = taosHashIterate(hash, NULL);
  while (ppRow != NULL) {
    SSdbRow *pRow = *ppRow;
    int32_t  id = *(int32_t *)pRow->pObj;
    maxId = MAX(id, maxId);
    ppRow = taosHashIterate(hash, ppRow);
  }

  taosRUnLockLatch(pLock);

  maxId = MAX(maxId, pSdb->maxId[type]);
  return maxId + 1;
}