sdbHash.c 11.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "sdbInt.h"

19 20
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow);

S
Shengliang Guan 已提交
21
const char *sdbTableName(ESdbType type) {
S
Shengliang Guan 已提交
22 23 24 25 26 27 28
  switch (type) {
    case SDB_TRANS:
      return "trans";
    case SDB_CLUSTER:
      return "cluster";
    case SDB_MNODE:
      return "mnode";
S
Shengliang Guan 已提交
29 30 31 32 33 34
    case SDB_QNODE:
      return "qnode";
    case SDB_SNODE:
      return "snode";
    case SDB_BNODE:
      return "bnode";
S
Shengliang Guan 已提交
35 36 37 38 39 40 41 42
    case SDB_DNODE:
      return "dnode";
    case SDB_USER:
      return "user";
    case SDB_AUTH:
      return "auth";
    case SDB_ACCT:
      return "acct";
L
add log  
Liu Jicong 已提交
43 44 45 46
    case SDB_STREAM:
      return "stream";
    case SDB_OFFSET:
      return "offset";
L
Liu Jicong 已提交
47 48
    case SDB_SUBSCRIBE:
      return "subscribe";
S
Shengliang Guan 已提交
49 50
    case SDB_CONSUMER:
      return "consumer";
S
Shengliang Guan 已提交
51 52 53
    case SDB_TOPIC:
      return "topic";
    case SDB_VGROUP:
54 55 56
      return "vgroup";
    case SDB_SMA:
      return "sma";
S
Shengliang Guan 已提交
57 58 59 60 61 62 63 64 65 66 67
    case SDB_STB:
      return "stb";
    case SDB_DB:
      return "db";
    case SDB_FUNC:
      return "func";
    default:
      return "undefine";
  }
}

S
Shengliang Guan 已提交
68 69 70 71 72 73 74 75 76 77 78 79
static const char *sdbStatusStr(ESdbStatus status) {
  switch (status) {
    case SDB_STATUS_CREATING:
      return "creating";
    case SDB_STATUS_UPDATING:
      return "updating";
    case SDB_STATUS_DROPPING:
      return "dropping";
    case SDB_STATUS_READY:
      return "ready";
    case SDB_STATUS_DROPPED:
      return "dropped";
80 81
    case SDB_STATUS_INIT:
      return "init";
S
Shengliang Guan 已提交
82 83 84 85 86
    default:
      return "undefine";
  }
}

S
Shengliang Guan 已提交
87 88 89 90
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
  EKeyType keyType = pSdb->keyTypes[pRow->type];

  if (keyType == SDB_KEY_BINARY) {
91 92
    mTrace("%s:%s, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper,
           pRow->pObj, sdbStatusStr(pRow->status));
S
Shengliang Guan 已提交
93
  } else if (keyType == SDB_KEY_INT32) {
94 95
    mTrace("%s:%d, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount,
           oper, pRow->pObj, sdbStatusStr(pRow->status));
S
Shengliang Guan 已提交
96
  } else if (keyType == SDB_KEY_INT64) {
97
    mTrace("%s:%" PRId64 ", ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
S
Shengliang Guan 已提交
98
           pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status));
S
Shengliang Guan 已提交
99 100 101 102
  } else {
  }
}

S
Shengliang Guan 已提交
103
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
S
Shengliang Guan 已提交
104
  if (type >= SDB_MAX || type < 0) {
S
Shengliang Guan 已提交
105 106 107 108
    terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
    return NULL;
  }

S
Shengliang Guan 已提交
109
  SHashObj *hash = pSdb->hashObjs[type];
S
Shengliang Guan 已提交
110 111 112 113 114 115 116 117
  if (hash == NULL) {
    terrno = TSDB_CODE_SDB_APP_ERROR;
    return NULL;
  }

  return hash;
}

L
Liu Jicong 已提交
118
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
S
Shengliang Guan 已提交
119
  int32_t  keySize;
S
Shengliang Guan 已提交
120
  EKeyType keyType = pSdb->keyTypes[type];
S
Shengliang Guan 已提交
121 122 123 124 125 126 127 128 129 130 131 132

  if (keyType == SDB_KEY_INT32) {
    keySize = sizeof(int32_t);
  } else if (keyType == SDB_KEY_BINARY) {
    keySize = strlen(pKey) + 1;
  } else {
    keySize = sizeof(int64_t);
  }

  return keySize;
}

S
Shengliang Guan 已提交
133
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
S
Shengliang Guan 已提交
134
  SRWLatch *pLock = &pSdb->locks[pRow->type];
S
Shengliang Guan 已提交
135 136
  taosWLockLatch(pLock);

S
Shengliang Guan 已提交
137 138
  SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
  if (pOldRow != NULL) {
S
Shengliang Guan 已提交
139
    taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
140
    sdbFreeRow(pSdb, pRow);
S
Shengliang Guan 已提交
141 142
    terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
    return terrno;
S
Shengliang Guan 已提交
143 144
  }

S
Shengliang Guan 已提交
145
  pRow->refCount = 0;
S
Shengliang Guan 已提交
146
  pRow->status = pRaw->status;
147
  sdbPrintOper(pSdb, pRow, "insert");
S
Shengliang Guan 已提交
148 149

  if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
S
Shengliang Guan 已提交
150
    taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
151
    sdbFreeRow(pSdb, pRow);
S
Shengliang Guan 已提交
152 153
    terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
    return terrno;
S
Shengliang Guan 已提交
154 155
  }

S
Shengliang Guan 已提交
156 157
  taosWUnLockLatch(pLock);

S
Shengliang Guan 已提交
158
  int32_t     code = 0;
S
Shengliang Guan 已提交
159
  SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
S
Shengliang Guan 已提交
160
  if (insertFp != NULL) {
S
Shengliang Guan 已提交
161 162
    code = (*insertFp)(pSdb, pRow->pObj);
    if (code != 0) {
S
Shengliang Guan 已提交
163
      code = terrno;
S
Shengliang Guan 已提交
164
      taosWLockLatch(pLock);
S
Shengliang Guan 已提交
165
      taosHashRemove(hash, pRow->pObj, keySize);
S
Shengliang Guan 已提交
166
      taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
167
      sdbFreeRow(pSdb, pRow);
S
Shengliang Guan 已提交
168 169
      terrno = code;
      return terrno;
S
Shengliang Guan 已提交
170 171 172
    }
  }

S
Shengliang Guan 已提交
173
  if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
dengyihao's avatar
dengyihao 已提交
174
    pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
S
Shengliang Guan 已提交
175 176
  }
  if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT64) {
dengyihao's avatar
dengyihao 已提交
177
    pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
S
Shengliang Guan 已提交
178 179 180
  }
  pSdb->tableVer[pRow->type]++;

S
Shengliang Guan 已提交
181 182 183
  return 0;
}

S
Shengliang Guan 已提交
184 185
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
  SRWLatch *pLock = &pSdb->locks[pNewRow->type];
S
Shengliang Guan 已提交
186 187
  taosRLockLatch(pLock);

S
Shengliang Guan 已提交
188 189
  SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
  if (ppOldRow == NULL || *ppOldRow == NULL) {
S
Shengliang Guan 已提交
190
    taosRUnLockLatch(pLock);
S
Shengliang Guan 已提交
191
    return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
S
Shengliang Guan 已提交
192 193
  }

S
Shengliang Guan 已提交
194
  SSdbRow *pOldRow = *ppOldRow;
S
Shengliang Guan 已提交
195
  pOldRow->status = pRaw->status;
196
  sdbPrintOper(pSdb, pOldRow, "update");
S
Shengliang Guan 已提交
197 198
  taosRUnLockLatch(pLock);

S
Shengliang Guan 已提交
199
  int32_t     code = 0;
S
Shengliang Guan 已提交
200
  SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
S
Shengliang Guan 已提交
201
  if (updateFp != NULL) {
S
Shengliang Guan 已提交
202
    code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
S
Shengliang Guan 已提交
203 204
  }

S
Shengliang Guan 已提交
205
  sdbFreeRow(pSdb, pNewRow);
S
Shengliang Guan 已提交
206 207

  pSdb->tableVer[pOldRow->type]++;
S
Shengliang Guan 已提交
208
  return code;
S
Shengliang Guan 已提交
209 210
}

S
Shengliang Guan 已提交
211
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
S
Shengliang Guan 已提交
212
  SRWLatch *pLock = &pSdb->locks[pRow->type];
S
Shengliang Guan 已提交
213 214
  taosWLockLatch(pLock);

S
Shengliang Guan 已提交
215 216
  SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
  if (ppOldRow == NULL || *ppOldRow == NULL) {
S
Shengliang Guan 已提交
217
    taosWUnLockLatch(pLock);
S
Shengliang Guan 已提交
218
    sdbFreeRow(pSdb, pRow);
S
Shengliang Guan 已提交
219 220
    terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
    return terrno;
S
Shengliang Guan 已提交
221
  }
S
Shengliang Guan 已提交
222
  SSdbRow *pOldRow = *ppOldRow;
S
Shengliang Guan 已提交
223

S
Shengliang Guan 已提交
224
  pOldRow->status = pRaw->status;
225
  sdbPrintOper(pSdb, pOldRow, "delete");
S
Shengliang Guan 已提交
226

S
Shengliang Guan 已提交
227
  taosHashRemove(hash, pOldRow->pObj, keySize);
S
Shengliang Guan 已提交
228 229
  taosWUnLockLatch(pLock);

S
Shengliang Guan 已提交
230
  pSdb->tableVer[pOldRow->type]++;
S
Shengliang Guan 已提交
231
  sdbFreeRow(pSdb, pRow);
232 233

  sdbCheck(pSdb, pOldRow);
S
Shengliang Guan 已提交
234 235
  // sdbRelease(pSdb, pOldRow->pObj);
  return 0;
S
Shengliang Guan 已提交
236 237
}

238
int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
239
  SHashObj *hash = sdbGetHash(pSdb, pRaw->type);
S
Shengliang Guan 已提交
240
  if (hash == NULL) return terrno;
S
Shengliang Guan 已提交
241

S
Shengliang Guan 已提交
242
  SdbDecodeFp decodeFp = pSdb->decodeFps[pRaw->type];
S
Shengliang Guan 已提交
243 244
  SSdbRow    *pRow = (*decodeFp)(pRaw);
  if (pRow == NULL) {
S
Shengliang Guan 已提交
245
    return terrno;
S
Shengliang Guan 已提交
246 247
  }

S
Shengliang Guan 已提交
248
  pRow->type = pRaw->type;
S
Shengliang Guan 已提交
249

S
Shengliang Guan 已提交
250
  int32_t keySize = sdbGetkeySize(pSdb, pRow->type, pRow->pObj);
S
Shengliang Guan 已提交
251
  int32_t code = TSDB_CODE_SDB_INVALID_ACTION_TYPE;
S
Shengliang Guan 已提交
252 253 254

  switch (pRaw->status) {
    case SDB_STATUS_CREATING:
S
Shengliang Guan 已提交
255
      code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
S
Shengliang Guan 已提交
256
      break;
S
Shengliang Guan 已提交
257
    case SDB_STATUS_UPDATING:
S
Shengliang Guan 已提交
258
    case SDB_STATUS_READY:
S
Shengliang Guan 已提交
259
    case SDB_STATUS_DROPPING:
S
Shengliang Guan 已提交
260
      code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
S
Shengliang Guan 已提交
261 262
      break;
    case SDB_STATUS_DROPPED:
S
Shengliang Guan 已提交
263
      code = sdbDeleteRow(pSdb, hash, pRaw, pRow, keySize);
S
Shengliang Guan 已提交
264
      break;
S
Shengliang Guan 已提交
265 266
  }

267
  return code;
S
Shengliang Guan 已提交
268 269
}

S
Shengliang Guan 已提交
270
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
271
  int32_t code = sdbWriteWithoutFree(pSdb, pRaw);
S
Shengliang Guan 已提交
272 273 274 275
  sdbFreeRaw(pRaw);
  return code;
}

L
Liu Jicong 已提交
276
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
277 278
  terrno = 0;

S
Shengliang Guan 已提交
279
  SHashObj *hash = sdbGetHash(pSdb, type);
S
Shengliang Guan 已提交
280 281 282
  if (hash == NULL) return NULL;

  void   *pRet = NULL;
S
Shengliang Guan 已提交
283
  int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
S
Shengliang Guan 已提交
284

S
Shengliang Guan 已提交
285
  SRWLatch *pLock = &pSdb->locks[type];
S
Shengliang Guan 已提交
286 287 288
  taosRLockLatch(pLock);

  SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
S
Shengliang Guan 已提交
289
  if (ppRow == NULL || *ppRow == NULL) {
S
Shengliang Guan 已提交
290
    taosRUnLockLatch(pLock);
S
Shengliang Guan 已提交
291
    terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
S
Shengliang Guan 已提交
292 293 294
    return NULL;
  }

S
Shengliang Guan 已提交
295 296 297
  SSdbRow *pRow = *ppRow;
  switch (pRow->status) {
    case SDB_STATUS_READY:
S
Shengliang Guan 已提交
298
    case SDB_STATUS_UPDATING:
S
Shengliang Guan 已提交
299 300
      atomic_add_fetch_32(&pRow->refCount, 1);
      pRet = pRow->pObj;
301
      sdbPrintOper(pSdb, pRow, "acquire");
S
Shengliang Guan 已提交
302
      break;
S
Shengliang Guan 已提交
303 304
    case SDB_STATUS_CREATING:
      terrno = TSDB_CODE_SDB_OBJ_CREATING;
S
Shengliang Guan 已提交
305
      break;
S
Shengliang Guan 已提交
306
    case SDB_STATUS_DROPPING:
S
Shengliang Guan 已提交
307
      terrno = TSDB_CODE_SDB_OBJ_DROPPING;
S
Shengliang Guan 已提交
308 309
      break;
    default:
S
Shengliang Guan 已提交
310 311
      terrno = TSDB_CODE_SDB_APP_ERROR;
      break;
S
Shengliang Guan 已提交
312 313
  }

S
Shengliang Guan 已提交
314 315
  taosRUnLockLatch(pLock);
  return pRet;
S
Shengliang Guan 已提交
316 317
}

318 319 320 321 322
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) {
  SRWLatch *pLock = &pSdb->locks[pRow->type];
  taosRLockLatch(pLock);

  int32_t ref = atomic_load_32(&pRow->refCount);
323
  sdbPrintOper(pSdb, pRow, "check");
324 325 326 327 328 329 330
  if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
    sdbFreeRow(pSdb, pRow);
  }

  taosRUnLockLatch(pLock);
}

S
Shengliang Guan 已提交
331
void sdbRelease(SSdb *pSdb, void *pObj) {
S
Shengliang Guan 已提交
332 333
  if (pObj == NULL) return;

S
Shengliang Guan 已提交
334
  SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
335
  if (pRow->type >= SDB_MAX) return;
S
Shengliang Guan 已提交
336

S
Shengliang Guan 已提交
337
  SRWLatch *pLock = &pSdb->locks[pRow->type];
S
Shengliang Guan 已提交
338 339 340
  taosRLockLatch(pLock);

  int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
341
  sdbPrintOper(pSdb, pRow, "release");
S
Shengliang Guan 已提交
342
  if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
S
Shengliang Guan 已提交
343
    sdbFreeRow(pSdb, pRow);
S
Shengliang Guan 已提交
344 345 346
  }

  taosRUnLockLatch(pLock);
S
Shengliang Guan 已提交
347 348
}

S
Shengliang Guan 已提交
349
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
350 351
  *ppObj = NULL;

S
Shengliang Guan 已提交
352
  SHashObj *hash = sdbGetHash(pSdb, type);
S
Shengliang Guan 已提交
353
  if (hash == NULL) return NULL;
S
Shengliang Guan 已提交
354

S
Shengliang Guan 已提交
355
  SRWLatch *pLock = &pSdb->locks[type];
S
Shengliang Guan 已提交
356
  taosRLockLatch(pLock);
S
Shengliang Guan 已提交
357

358
#if 0
S
Shengliang Guan 已提交
359 360
  if (pIter != NULL) {
    SSdbRow *pLastRow = *(SSdbRow **)pIter;
S
Shengliang Guan 已提交
361
    int32_t  ref = atomic_load_32(&pLastRow->refCount);
S
Shengliang Guan 已提交
362
    if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) {
S
Shengliang Guan 已提交
363
      sdbFreeRow(pSdb, pLastRow);
S
Shengliang Guan 已提交
364 365
    }
  }
366
#endif
S
Shengliang Guan 已提交
367 368

  SSdbRow **ppRow = taosHashIterate(hash, pIter);
S
Shengliang Guan 已提交
369 370 371 372 373 374 375 376
  while (ppRow != NULL) {
    SSdbRow *pRow = *ppRow;
    if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
      ppRow = taosHashIterate(hash, ppRow);
      continue;
    }

    atomic_add_fetch_32(&pRow->refCount, 1);
377
    sdbPrintOper(pSdb, pRow, "fetch");
S
Shengliang Guan 已提交
378 379 380
    *ppObj = pRow->pObj;
    break;
  }
S
Shengliang Guan 已提交
381 382
  taosRUnLockLatch(pLock);

S
Shengliang Guan 已提交
383
  return ppRow;
S
Shengliang Guan 已提交
384 385
}

S
Shengliang Guan 已提交
386
void sdbCancelFetch(SSdb *pSdb, void *pIter) {
S
Shengliang Guan 已提交
387 388
  if (pIter == NULL) return;
  SSdbRow  *pRow = *(SSdbRow **)pIter;
S
Shengliang Guan 已提交
389
  SHashObj *hash = sdbGetHash(pSdb, pRow->type);
S
Shengliang Guan 已提交
390
  if (hash == NULL) return;
S
Shengliang Guan 已提交
391

S
Shengliang Guan 已提交
392
  SRWLatch *pLock = &pSdb->locks[pRow->type];
S
Shengliang Guan 已提交
393
  taosRLockLatch(pLock);
S
Shengliang Guan 已提交
394
  taosHashCancelIterate(hash, pIter);
S
Shengliang Guan 已提交
395
  taosRUnLockLatch(pLock);
S
Shengliang Guan 已提交
396 397
}

S
Shengliang Guan 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
  SHashObj *hash = sdbGetHash(pSdb, type);
  if (hash == NULL) return;

  SRWLatch *pLock = &pSdb->locks[type];
  taosRLockLatch(pLock);

  SSdbRow **ppRow = taosHashIterate(hash, NULL);
  while (ppRow != NULL) {
    SSdbRow *pRow = *ppRow;
    if (pRow->status == SDB_STATUS_READY) {
      bool isContinue = (*fp)(pSdb->pMnode, pRow->pObj, p1, p2, p3);
      if (!isContinue) {
        taosHashCancelIterate(hash, ppRow);
        break;
      }
    }

    ppRow = taosHashIterate(hash, ppRow);
  }

  taosRUnLockLatch(pLock);
}

S
Shengliang Guan 已提交
422 423
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
  SHashObj *hash = sdbGetHash(pSdb, type);
S
Shengliang Guan 已提交
424
  if (hash == NULL) return 0;
S
Shengliang Guan 已提交
425

S
Shengliang Guan 已提交
426
  SRWLatch *pLock = &pSdb->locks[type];
S
Shengliang Guan 已提交
427 428 429 430 431
  taosRLockLatch(pLock);
  int32_t size = taosHashGetSize(hash);
  taosRUnLockLatch(pLock);

  return size;
S
Shengliang Guan 已提交
432
}
S
Shengliang Guan 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448

int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
  SHashObj *hash = sdbGetHash(pSdb, type);
  if (hash == NULL) return -1;

  if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1;

  int32_t maxId = 0;

  SRWLatch *pLock = &pSdb->locks[type];
  taosRLockLatch(pLock);

  SSdbRow **ppRow = taosHashIterate(hash, NULL);
  while (ppRow != NULL) {
    SSdbRow *pRow = *ppRow;
    int32_t  id = *(int32_t *)pRow->pObj;
dengyihao's avatar
dengyihao 已提交
449
    maxId = TMAX(id, maxId);
S
Shengliang Guan 已提交
450 451 452 453 454
    ppRow = taosHashIterate(hash, ppRow);
  }

  taosRUnLockLatch(pLock);

dengyihao's avatar
dengyihao 已提交
455
  maxId = TMAX(maxId, pSdb->maxId[type]);
S
Shengliang Guan 已提交
456 457
  return maxId + 1;
}
458 459 460 461 462 463 464 465 466

int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type) {
  if (type >= SDB_MAX || type < 0) {
    terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
    return -1;
  }

  return pSdb->tableVer[type];
}