metaBDBImpl.c 23.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define ALLOW_FORBID_FUNC
H
more  
Hongze Cheng 已提交
17 18
#include "db.h"

H
Hongze Cheng 已提交
19 20
#include "metaDef.h"

H
Hongze Cheng 已提交
21
#include "tcoding.h"
H
more  
Hongze Cheng 已提交
22
#include "thash.h"
H
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25 26 27
#define IMPL_WITH_LOCK 1
// #if IMPL_WITH_LOCK
// #endif

H
Hongze Cheng 已提交
28 29 30
typedef struct {
  tb_uid_t uid;
  int32_t  sver;
H
Hongze Cheng 已提交
31
  int32_t  padding;
H
Hongze Cheng 已提交
32 33
} SSchemaKey;

H
Hongze Cheng 已提交
34
struct SMetaDB {
H
Hongze Cheng 已提交
35
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
36
  TdThreadRwlock rwlock;
H
Hongze Cheng 已提交
37
#endif
H
more  
Hongze Cheng 已提交
38
  // DB
H
more  
Hongze Cheng 已提交
39 40
  DB *pTbDB;
  DB *pSchemaDB;
C
Cary Xu 已提交
41
  DB *pSmaDB;
C
Cary Xu 已提交
42

H
more  
Hongze Cheng 已提交
43
  // IDX
H
more  
Hongze Cheng 已提交
44 45 46 47
  DB *pNameIdx;
  DB *pStbIdx;
  DB *pNtbIdx;
  DB *pCtbIdx;
C
Cary Xu 已提交
48
  DB *pSmaIdx;
H
more  
Hongze Cheng 已提交
49
  // ENV
H
Hongze Cheng 已提交
50 51 52
  DB_ENV *pEvn;
};

H
Hongze Cheng 已提交
53 54
typedef int (*bdbIdxCbPtr)(DB *, const DBT *, const DBT *, DBT *);

H
more  
Hongze Cheng 已提交
55 56
static SMetaDB *metaNewDB();
static void     metaFreeDB(SMetaDB *pDB);
H
more  
Hongze Cheng 已提交
57 58
static int      metaOpenBDBEnv(DB_ENV **ppEnv, const char *path);
static void     metaCloseBDBEnv(DB_ENV *pEnv);
H
Hongze Cheng 已提交
59
static int      metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
H
more  
Hongze Cheng 已提交
60
static void     metaCloseBDBDb(DB *pDB);
H
Hongze Cheng 已提交
61
static int      metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup);
H
Hongze Cheng 已提交
62
static void     metaCloseBDBIdx(DB *pIdx);
H
Hongze Cheng 已提交
63 64 65 66
static int      metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
C
Cary Xu 已提交
67
static int      metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
H
Hongze Cheng 已提交
68
static int      metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
L
fix  
Liu Jicong 已提交
69
static void    *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
H
Hongze Cheng 已提交
70
static void     metaClearTbCfg(STbCfg *pTbCfg);
H
Hongze Cheng 已提交
71
static int      metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
L
fix  
Liu Jicong 已提交
72
static void    *metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
H
Hongze Cheng 已提交
73 74 75
static void     metaDBWLock(SMetaDB *pDB);
static void     metaDBRLock(SMetaDB *pDB);
static void     metaDBULock(SMetaDB *pDB);
H
more  
Hongze Cheng 已提交
76

H
more  
Hongze Cheng 已提交
77
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
H
Hongze Cheng 已提交
78

H
Hongze Cheng 已提交
79
int metaOpenDB(SMeta *pMeta) {
H
more  
Hongze Cheng 已提交
80
  SMetaDB *pDB;
H
Hongze Cheng 已提交
81

H
more  
Hongze Cheng 已提交
82 83 84
  // Create DB object
  pDB = metaNewDB();
  if (pDB == NULL) {
H
more  
Hongze Cheng 已提交
85 86 87
    return -1;
  }

H
more  
Hongze Cheng 已提交
88
  pMeta->pDB = pDB;
H
more  
Hongze Cheng 已提交
89

H
more  
Hongze Cheng 已提交
90 91
  // Open DB Env
  if (metaOpenBDBEnv(&(pDB->pEvn), pMeta->path) < 0) {
H
more  
Hongze Cheng 已提交
92
    metaCloseDB(pMeta);
H
more  
Hongze Cheng 已提交
93 94
    return -1;
  }
H
Hongze Cheng 已提交
95

H
more  
Hongze Cheng 已提交
96
  // Open DBs
H
Hongze Cheng 已提交
97
  if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db", false) < 0) {
H
more  
Hongze Cheng 已提交
98 99
    metaCloseDB(pMeta);
    return -1;
H
more  
Hongze Cheng 已提交
100 101
  }

H
more  
Hongze Cheng 已提交
102
  if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "schema.db", false) < 0) {
H
more  
Hongze Cheng 已提交
103 104 105
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
106

C
Cary Xu 已提交
107 108 109 110 111
  if (metaOpenBDBDb(&(pDB->pSmaDB), pDB->pEvn, "sma.db", false) < 0) {
    metaCloseDB(pMeta);
    return -1;
  }

H
more  
Hongze Cheng 已提交
112
  // Open Indices
H
Hongze Cheng 已提交
113
  if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
114 115 116
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
117

H
Hongze Cheng 已提交
118
  if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "stb.index", pDB->pTbDB, &metaStbIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
119 120 121
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
122

H
Hongze Cheng 已提交
123
  if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "ntb.index", pDB->pTbDB, &metaNtbIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
124 125
    metaCloseDB(pMeta);
    return -1;
H
Hongze Cheng 已提交
126 127
  }

H
Hongze Cheng 已提交
128
  if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "ctb.index", pDB->pTbDB, &metaCtbIdxCb, true) < 0) {
H
more  
Hongze Cheng 已提交
129 130
    metaCloseDB(pMeta);
    return -1;
H
more  
Hongze Cheng 已提交
131 132
  }

C
Cary Xu 已提交
133 134 135 136 137
  if (metaOpenBDBIdx(&(pDB->pSmaIdx), pDB->pEvn, "sma.index", pDB->pSmaDB, &metaSmaIdxCb, true) < 0) {
    metaCloseDB(pMeta);
    return -1;
  }

H
Hongze Cheng 已提交
138 139 140 141 142
  return 0;
}

void metaCloseDB(SMeta *pMeta) {
  if (pMeta->pDB) {
C
Cary Xu 已提交
143
    metaCloseBDBIdx(pMeta->pDB->pSmaIdx);
H
Hongze Cheng 已提交
144 145 146 147
    metaCloseBDBIdx(pMeta->pDB->pCtbIdx);
    metaCloseBDBIdx(pMeta->pDB->pNtbIdx);
    metaCloseBDBIdx(pMeta->pDB->pStbIdx);
    metaCloseBDBIdx(pMeta->pDB->pNameIdx);
C
Cary Xu 已提交
148
    metaCloseBDBDb(pMeta->pDB->pSmaDB);
H
Hongze Cheng 已提交
149 150
    metaCloseBDBDb(pMeta->pDB->pSchemaDB);
    metaCloseBDBDb(pMeta->pDB->pTbDB);
H
more  
Hongze Cheng 已提交
151
    metaCloseBDBEnv(pMeta->pDB->pEvn);
H
more  
Hongze Cheng 已提交
152 153
    metaFreeDB(pMeta->pDB);
    pMeta->pDB = NULL;
H
Hongze Cheng 已提交
154 155 156
  }
}

H
more  
Hongze Cheng 已提交
157
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
H
more  
Hongze Cheng 已提交
158 159
  tb_uid_t uid;
  char     buf[512];
H
fix  
Hongze Cheng 已提交
160
  char     buf1[512];
L
fix  
Liu Jicong 已提交
161
  void    *pBuf;
H
Hongze Cheng 已提交
162 163
  DBT      key1, value1;
  DBT      key2, value2;
H
more  
Hongze Cheng 已提交
164
  SSchema *pSchema = NULL;
H
Hongze Cheng 已提交
165 166 167 168 169 170 171 172 173

  if (pTbCfg->type == META_SUPER_TABLE) {
    uid = pTbCfg->stbCfg.suid;
  } else {
    uid = metaGenerateUid(pMeta);
  }

  {
    // save table info
H
Hongze Cheng 已提交
174
    pBuf = buf;
H
Hongze Cheng 已提交
175 176
    memset(&key1, 0, sizeof(key1));
    memset(&value1, 0, sizeof(key1));
H
Hongze Cheng 已提交
177

H
Hongze Cheng 已提交
178 179
    key1.data = &uid;
    key1.size = sizeof(uid);
H
Hongze Cheng 已提交
180

H
Hongze Cheng 已提交
181
    metaEncodeTbInfo(&pBuf, pTbCfg);
H
Hongze Cheng 已提交
182

H
Hongze Cheng 已提交
183 184 185
    value1.data = buf;
    value1.size = POINTER_DISTANCE(pBuf, buf);
    value1.app_data = pTbCfg;
H
Hongze Cheng 已提交
186 187 188
  }

  // save schema
H
more  
Hongze Cheng 已提交
189
  uint32_t ncols;
H
Hongze Cheng 已提交
190
  if (pTbCfg->type == META_SUPER_TABLE) {
H
more  
Hongze Cheng 已提交
191
    ncols = pTbCfg->stbCfg.nCols;
H
Hongze Cheng 已提交
192 193
    pSchema = pTbCfg->stbCfg.pSchema;
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
H
more  
Hongze Cheng 已提交
194
    ncols = pTbCfg->ntbCfg.nCols;
H
Hongze Cheng 已提交
195 196 197 198
    pSchema = pTbCfg->ntbCfg.pSchema;
  }

  if (pSchema) {
H
fix  
Hongze Cheng 已提交
199
    pBuf = buf1;
H
Hongze Cheng 已提交
200 201
    memset(&key2, 0, sizeof(key2));
    memset(&value2, 0, sizeof(key2));
H
Hongze Cheng 已提交
202
    SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0};
H
Hongze Cheng 已提交
203

H
Hongze Cheng 已提交
204 205
    key2.data = &schemaKey;
    key2.size = sizeof(schemaKey);
H
Hongze Cheng 已提交
206

H
Hongze Cheng 已提交
207 208
    SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema};
    metaEncodeSchema(&pBuf, &sw);
H
Hongze Cheng 已提交
209

H
fix  
Hongze Cheng 已提交
210 211
    value2.data = buf1;
    value2.size = POINTER_DISTANCE(pBuf, buf1);
H
Hongze Cheng 已提交
212 213
  }

H
Hongze Cheng 已提交
214 215
  metaDBWLock(pMeta->pDB);
  pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key1, &value1, 0);
H
fix  
Hongze Cheng 已提交
216 217 218
  if (pSchema) {
    pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key2, &value2, 0);
  }
H
Hongze Cheng 已提交
219 220
  metaDBULock(pMeta->pDB);

H
more  
Hongze Cheng 已提交
221 222 223 224 225 226 227 228
  return 0;
}

int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
  // TODO
  return 0;
}

C
Cary Xu 已提交
229
int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
C
Cary Xu 已提交
230 231
  // char  buf[512] = {0};  // TODO: may overflow
  void *pBuf = NULL, *qBuf = NULL;
C
Cary Xu 已提交
232 233
  DBT   key1 = {0}, value1 = {0};

234 235
  // save sma info
  int32_t len = tEncodeTSma(NULL, pSmaCfg);
236
  pBuf = taosMemoryCalloc(1, len);
237 238 239
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
C
Cary Xu 已提交
240 241
  }

242 243 244 245 246 247 248 249 250 251
  key1.data = (void *)&pSmaCfg->indexUid;
  key1.size = sizeof(pSmaCfg->indexUid);

  qBuf = pBuf;
  tEncodeTSma(&qBuf, pSmaCfg);

  value1.data = pBuf;
  value1.size = POINTER_DISTANCE(qBuf, pBuf);
  value1.app_data = pSmaCfg;

C
Cary Xu 已提交
252 253 254 255
  metaDBWLock(pMeta->pDB);
  pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0);
  metaDBULock(pMeta->pDB);

256
  // release
wafwerar's avatar
wafwerar 已提交
257
  taosMemoryFreeClear(pBuf);
258

C
Cary Xu 已提交
259 260 261
  return 0;
}

C
Cary Xu 已提交
262
int metaRemoveSmaFromDb(SMeta *pMeta,  int64_t indexUid) {
C
Cary Xu 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
  // TODO
#if 0
  DBT key = {0};

  key.data = (void *)indexName;
  key.size = strlen(indexName);

  metaDBWLock(pMeta->pDB);
  // TODO: No guarantee of consistence.
  // Use transaction or DB->sync() for some guarantee.
  pMeta->pDB->pSmaDB->del(pMeta->pDB->pSmaDB, NULL, &key, 0);
  metaDBULock(pMeta->pDB);
#endif
  return 0;
}

H
more  
Hongze Cheng 已提交
279
/* ------------------------ STATIC METHODS ------------------------ */
H
Hongze Cheng 已提交
280 281 282 283 284 285 286 287
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
  int      tlen = 0;
  SSchema *pSchema;

  tlen += taosEncodeFixedU32(buf, pSW->nCols);
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tlen += taosEncodeFixedI8(buf, pSchema->type);
288
    tlen += taosEncodeFixedI16(buf, pSchema->colId);
H
Hongze Cheng 已提交
289 290 291 292 293 294 295 296 297 298 299
    tlen += taosEncodeFixedI32(buf, pSchema->bytes);
    tlen += taosEncodeString(buf, pSchema->name);
  }

  return tlen;
}

static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
  SSchema *pSchema;

  buf = taosDecodeFixedU32(buf, &pSW->nCols);
wafwerar's avatar
wafwerar 已提交
300
  pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
H
Hongze Cheng 已提交
301 302 303
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    buf = taosDecodeFixedI8(buf, &pSchema->type);
304
    buf = taosDecodeFixedI16(buf, &pSchema->colId);
H
Hongze Cheng 已提交
305 306 307 308 309 310 311
    buf = taosDecodeFixedI32(buf, &pSchema->bytes);
    buf = taosDecodeStringTo(buf, pSchema->name);
  }

  return buf;
}

H
more  
Hongze Cheng 已提交
312 313
static SMetaDB *metaNewDB() {
  SMetaDB *pDB = NULL;
wafwerar's avatar
wafwerar 已提交
314
  pDB = (SMetaDB *)taosMemoryCalloc(1, sizeof(*pDB));
H
more  
Hongze Cheng 已提交
315 316 317 318
  if (pDB == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
319
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
320
  taosThreadRwlockInit(&pDB->rwlock, NULL);
H
Hongze Cheng 已提交
321 322
#endif

H
more  
Hongze Cheng 已提交
323 324 325 326 327
  return pDB;
}

static void metaFreeDB(SMetaDB *pDB) {
  if (pDB) {
H
Hongze Cheng 已提交
328
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
329
    taosThreadRwlockDestroy(&pDB->rwlock);
H
Hongze Cheng 已提交
330
#endif
wafwerar's avatar
wafwerar 已提交
331
    taosMemoryFree(pDB);
H
more  
Hongze Cheng 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
  }
}

static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
  int     ret;
  DB_ENV *pEnv;

  if (path == NULL) return 0;

  ret = db_env_create(&pEnv, 0);
  if (ret != 0) {
    BDB_PERR("Failed to create META env", ret);
    return -1;
  }

H
Hongze Cheng 已提交
347
  ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
H
more  
Hongze Cheng 已提交
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
  if (ret != 0) {
    BDB_PERR("Failed to open META env", ret);
    return -1;
  }

  *ppEnv = pEnv;

  return 0;
}

static void metaCloseBDBEnv(DB_ENV *pEnv) {
  if (pEnv) {
    pEnv->close(pEnv, 0);
  }
}

H
Hongze Cheng 已提交
364
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) {
H
more  
Hongze Cheng 已提交
365 366 367
  int ret;
  DB *pDB;

H
Hongze Cheng 已提交
368
  ret = db_create(&(pDB), pEnv, 0);
H
more  
Hongze Cheng 已提交
369 370 371 372 373
  if (ret != 0) {
    BDB_PERR("Failed to create META DB", ret);
    return -1;
  }

H
Hongze Cheng 已提交
374 375 376 377 378 379 380 381
  if (isDup) {
    ret = pDB->set_flags(pDB, DB_DUPSORT);
    if (ret != 0) {
      BDB_PERR("Failed to set DB flags", ret);
      return -1;
    }
  }

H
more  
Hongze Cheng 已提交
382 383 384 385 386 387
  ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0);
  if (ret) {
    BDB_PERR("Failed to open META DB", ret);
    return -1;
  }

H
Hongze Cheng 已提交
388 389
  *ppDB = pDB;

H
more  
Hongze Cheng 已提交
390 391 392 393 394 395 396 397 398
  return 0;
}

static void metaCloseBDBDb(DB *pDB) {
  if (pDB) {
    pDB->close(pDB, 0);
  }
}

H
Hongze Cheng 已提交
399
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup) {
H
Hongze Cheng 已提交
400 401 402
  DB *pIdx;
  int ret;

H
Hongze Cheng 已提交
403
  if (metaOpenBDBDb(ppIdx, pEnv, pFName, isDup) < 0) {
H
Hongze Cheng 已提交
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
    return -1;
  }

  pIdx = *ppIdx;
  ret = pDB->associate(pDB, NULL, pIdx, cbf, 0);
  if (ret) {
    BDB_PERR("Failed to associate META DB and Index", ret);
  }

  return 0;
}

static void metaCloseBDBIdx(DB *pIdx) {
  if (pIdx) {
    pIdx->close(pIdx, 0);
  }
}

static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
423
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
424

H
Hongze Cheng 已提交
425
  memset(pSKey, 0, sizeof(*pSKey));
H
Hongze Cheng 已提交
426

H
Hongze Cheng 已提交
427 428
  pSKey->data = pTbCfg->name;
  pSKey->size = strlen(pTbCfg->name);
H
Hongze Cheng 已提交
429 430 431 432

  return 0;
}

H
Hongze Cheng 已提交
433
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
434
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
435

H
Hongze Cheng 已提交
436
  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
437 438 439
    memset(pSKey, 0, sizeof(*pSKey));
    pSKey->data = pKey->data;
    pSKey->size = pKey->size;
H
Hongze Cheng 已提交
440

H
Hongze Cheng 已提交
441
    return 0;
H
Hongze Cheng 已提交
442
  } else {
H
Hongze Cheng 已提交
443
    return DB_DONOTINDEX;
H
Hongze Cheng 已提交
444
  }
H
Hongze Cheng 已提交
445 446
}

H
Hongze Cheng 已提交
447
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
448
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
449

H
Hongze Cheng 已提交
450
  if (pTbCfg->type == META_NORMAL_TABLE) {
H
Hongze Cheng 已提交
451 452 453
    memset(pSKey, 0, sizeof(*pSKey));
    pSKey->data = pKey->data;
    pSKey->size = pKey->size;
H
Hongze Cheng 已提交
454

H
Hongze Cheng 已提交
455 456 457 458
    return 0;
  } else {
    return DB_DONOTINDEX;
  }
H
Hongze Cheng 已提交
459 460
}

H
Hongze Cheng 已提交
461
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
462
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
L
fix  
Liu Jicong 已提交
463
  DBT    *pDbt;
H
Hongze Cheng 已提交
464

H
Hongze Cheng 已提交
465
  if (pTbCfg->type == META_CHILD_TABLE) {
wafwerar's avatar
wafwerar 已提交
466
    // pDbt = taosMemoryCalloc(2, sizeof(DBT));
H
Hongze Cheng 已提交
467

H
Hongze Cheng 已提交
468 469 470
    // // First key is suid
    // pDbt[0].data = &(pTbCfg->ctbCfg.suid);
    // pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid);
H
Hongze Cheng 已提交
471

H
Hongze Cheng 已提交
472 473 474 475
    // // Second key is the first tag
    // void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
    // pDbt[1].data = pTagVal;
    // pDbt[1].size = sizeof(int32_t);
H
Hongze Cheng 已提交
476

H
Hongze Cheng 已提交
477
    // Set index key
H
Hongze Cheng 已提交
478
    memset(pSKey, 0, sizeof(*pSKey));
H
Hongze Cheng 已提交
479
#if 0
H
Hongze Cheng 已提交
480 481 482
    pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
    pSKey->data = pDbt;
    pSKey->size = 2;
H
Hongze Cheng 已提交
483 484 485 486
#else
    pSKey->data = &(pTbCfg->ctbCfg.suid);
    pSKey->size = sizeof(pTbCfg->ctbCfg.suid);
#endif
H
Hongze Cheng 已提交
487

H
Hongze Cheng 已提交
488 489 490 491
    return 0;
  } else {
    return DB_DONOTINDEX;
  }
H
Hongze Cheng 已提交
492 493
}

C
Cary Xu 已提交
494
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
C
Cary Xu 已提交
495
  STSma *pSmaCfg = (STSma *)(pValue->app_data);
C
Cary Xu 已提交
496 497 498 499 500 501 502 503

  memset(pSKey, 0, sizeof(*pSKey));
  pSKey->data = &(pSmaCfg->tableUid);
  pSKey->size = sizeof(pSmaCfg->tableUid);

  return 0;
}

H
Hongze Cheng 已提交
504
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
H
more  
Hongze Cheng 已提交
505 506 507 508 509
  int tsize = 0;

  tsize += taosEncodeString(buf, pTbCfg->name);
  tsize += taosEncodeFixedU32(buf, pTbCfg->ttl);
  tsize += taosEncodeFixedU32(buf, pTbCfg->keep);
C
Cary Xu 已提交
510
  tsize += taosEncodeFixedU8(buf, pTbCfg->info);
H
more  
Hongze Cheng 已提交
511 512

  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
513 514
    SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema};
    tsize += metaEncodeSchema(buf, &sw);
H
more  
Hongze Cheng 已提交
515 516 517 518
  } else if (pTbCfg->type == META_CHILD_TABLE) {
    tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
    tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
519
    // TODO
H
more  
Hongze Cheng 已提交
520 521 522 523 524
  } else {
    ASSERT(0);
  }

  return tsize;
H
Hongze Cheng 已提交
525 526 527
}

static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
H
more  
Hongze Cheng 已提交
528 529 530
  buf = taosDecodeString(buf, &(pTbCfg->name));
  buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl));
  buf = taosDecodeFixedU32(buf, &(pTbCfg->keep));
C
Cary Xu 已提交
531
  buf = taosDecodeFixedU8(buf, &(pTbCfg->info));
H
more  
Hongze Cheng 已提交
532 533

  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
534 535 536 537
    SSchemaWrapper sw;
    buf = metaDecodeSchema(buf, &sw);
    pTbCfg->stbCfg.nTagCols = sw.nCols;
    pTbCfg->stbCfg.pTagSchema = sw.pSchema;
H
more  
Hongze Cheng 已提交
538 539 540 541
  } else if (pTbCfg->type == META_CHILD_TABLE) {
    buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
    buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
542
    // TODO
H
more  
Hongze Cheng 已提交
543 544 545 546
  } else {
    ASSERT(0);
  }
  return buf;
H
Hongze Cheng 已提交
547 548
}

H
Hongze Cheng 已提交
549
static void metaClearTbCfg(STbCfg *pTbCfg) {
wafwerar's avatar
wafwerar 已提交
550
  taosMemoryFreeClear(pTbCfg->name);
H
Hongze Cheng 已提交
551 552 553
  if (pTbCfg->type == META_SUPER_TABLE) {
    tdFreeSchema(pTbCfg->stbCfg.pTagSchema);
  } else if (pTbCfg->type == META_CHILD_TABLE) {
wafwerar's avatar
wafwerar 已提交
554
    taosMemoryFreeClear(pTbCfg->ctbCfg.pTag);
H
Hongze Cheng 已提交
555
  }
H
more  
Hongze Cheng 已提交
556 557 558
}

/* ------------------------ FOR QUERY ------------------------ */
H
Hongze Cheng 已提交
559
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
L
fix  
Liu Jicong 已提交
560
  STbCfg  *pTbCfg = NULL;
H
Hongze Cheng 已提交
561 562 563 564 565 566 567 568 569 570
  SMetaDB *pDB = pMeta->pDB;
  DBT      key = {0};
  DBT      value = {0};
  int      ret;

  // Set key/value
  key.data = &uid;
  key.size = sizeof(uid);

  // Query
H
Hongze Cheng 已提交
571
  metaDBRLock(pDB);
H
Hongze Cheng 已提交
572
  ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0);
H
Hongze Cheng 已提交
573
  metaDBULock(pDB);
H
Hongze Cheng 已提交
574 575 576 577 578
  if (ret != 0) {
    return NULL;
  }

  // Decode
wafwerar's avatar
wafwerar 已提交
579
  pTbCfg = (STbCfg *)taosMemoryMalloc(sizeof(*pTbCfg));
H
Hongze Cheng 已提交
580 581 582
  if (pTbCfg == NULL) {
    return NULL;
  }
H
more  
Hongze Cheng 已提交
583

H
Hongze Cheng 已提交
584 585 586 587 588
  metaDecodeTbInfo(value.data, pTbCfg);

  return pTbCfg;
}

H
Hongze Cheng 已提交
589
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
L
fix  
Liu Jicong 已提交
590
  STbCfg  *pTbCfg = NULL;
H
Hongze Cheng 已提交
591 592 593 594 595 596 597
  SMetaDB *pDB = pMeta->pDB;
  DBT      key = {0};
  DBT      pkey = {0};
  DBT      pvalue = {0};
  int      ret;

  // Set key/value
H
more  
Hongze Cheng 已提交
598
  key.data = tbname;
H
Hongze Cheng 已提交
599
  key.size = strlen(tbname);
H
more  
Hongze Cheng 已提交
600

H
Hongze Cheng 已提交
601
  // Query
H
Hongze Cheng 已提交
602
  metaDBRLock(pDB);
H
Hongze Cheng 已提交
603
  ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0);
H
Hongze Cheng 已提交
604
  metaDBULock(pDB);
H
Hongze Cheng 已提交
605
  if (ret != 0) {
H
Hongze Cheng 已提交
606
    return NULL;
H
Hongze Cheng 已提交
607 608
  }

H
Hongze Cheng 已提交
609 610
  // Decode
  *uid = *(tb_uid_t *)(pkey.data);
wafwerar's avatar
wafwerar 已提交
611
  pTbCfg = (STbCfg *)taosMemoryMalloc(sizeof(*pTbCfg));
H
Hongze Cheng 已提交
612 613 614 615 616 617 618
  if (pTbCfg == NULL) {
    return NULL;
  }

  metaDecodeTbInfo(pvalue.data, pTbCfg);

  return pTbCfg;
H
more  
Hongze Cheng 已提交
619 620
}

C
Cary Xu 已提交
621
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
C
Cary Xu 已提交
622
  STSma *  pCfg = NULL;
C
Cary Xu 已提交
623 624 625 626 627 628
  SMetaDB *pDB = pMeta->pDB;
  DBT      key = {0};
  DBT      value = {0};
  int      ret;

  // Set key/value
C
Cary Xu 已提交
629 630
  key.data = (void *)&indexUid;
  key.size = sizeof(indexUid);
C
Cary Xu 已提交
631 632 633 634 635 636 637 638 639 640

  // Query
  metaDBRLock(pDB);
  ret = pDB->pTbDB->get(pDB->pSmaDB, NULL, &key, &value, 0);
  metaDBULock(pDB);
  if (ret != 0) {
    return NULL;
  }

  // Decode
wafwerar's avatar
wafwerar 已提交
641
  pCfg = (STSma *)taosMemoryCalloc(1, sizeof(STSma));
C
Cary Xu 已提交
642 643 644 645
  if (pCfg == NULL) {
    return NULL;
  }

C
Cary Xu 已提交
646
  if (tDecodeTSma(value.data, pCfg) == NULL) {
wafwerar's avatar
wafwerar 已提交
647
    taosMemoryFreeClear(pCfg);
C
Cary Xu 已提交
648 649
    return NULL;
  }
C
Cary Xu 已提交
650 651 652 653

  return pCfg;
}

H
Hongze Cheng 已提交
654
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
H
more  
Hongze Cheng 已提交
655 656
  uint32_t        nCols;
  SSchemaWrapper *pSW = NULL;
L
fix  
Liu Jicong 已提交
657
  SMetaDB        *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
658
  int             ret;
L
fix  
Liu Jicong 已提交
659 660
  void           *pBuf;
  SSchema        *pSchema;
H
Hongze Cheng 已提交
661
  SSchemaKey      schemaKey = {uid, sver, 0};
H
more  
Hongze Cheng 已提交
662 663 664 665 666 667 668 669
  DBT             key = {0};
  DBT             value = {0};

  // Set key/value properties
  key.data = &schemaKey;
  key.size = sizeof(schemaKey);

  // Query
H
Hongze Cheng 已提交
670
  metaDBRLock(pDB);
H
more  
Hongze Cheng 已提交
671
  ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0);
H
Hongze Cheng 已提交
672
  metaDBULock(pDB);
H
more  
Hongze Cheng 已提交
673
  if (ret != 0) {
H
Hongze Cheng 已提交
674
    printf("failed to query schema DB since %s================\n", db_strerror(ret));
H
more  
Hongze Cheng 已提交
675 676 677 678 679
    return NULL;
  }

  // Decode the schema
  pBuf = value.data;
wafwerar's avatar
wafwerar 已提交
680
  pSW = taosMemoryMalloc(sizeof(*pSW));
H
Hongze Cheng 已提交
681
  metaDecodeSchema(pBuf, pSW);
H
more  
Hongze Cheng 已提交
682 683

  return pSW;
H
more  
Hongze Cheng 已提交
684 685 686 687 688 689 690 691
}

struct SMTbCursor {
  DBC *pCur;
};

SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;
L
fix  
Liu Jicong 已提交
692
  SMetaDB    *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
693

wafwerar's avatar
wafwerar 已提交
694
  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
H
more  
Hongze Cheng 已提交
695 696 697 698 699 700
  if (pTbCur == NULL) {
    return NULL;
  }

  pDB->pTbDB->cursor(pDB->pTbDB, NULL, &(pTbCur->pCur), 0);

H
more  
Hongze Cheng 已提交
701 702 703 704 705 706
#if 0
    DB_BTREE_STAT *sp;
    pDB->pTbDB->stat(pDB->pTbDB, NULL, &sp, 0);
    printf("**************** %ld\n", sp->bt_nkeys);
#endif

H
more  
Hongze Cheng 已提交
707 708 709
  return pTbCur;
}

D
dapan1121 已提交
710 711 712 713 714 715 716 717 718 719 720 721
int metaGetTbNum(SMeta *pMeta) {
  SMetaDB    *pDB = pMeta->pDB;

  DB_BTREE_STAT *sp1;
  pDB->pTbDB->stat(pDB->pNtbIdx, NULL, &sp1, 0);
  
  DB_BTREE_STAT *sp2;
  pDB->pTbDB->stat(pDB->pCtbIdx, NULL, &sp2, 0);
  
  return sp1->bt_nkeys + sp2->bt_nkeys;
}

H
more  
Hongze Cheng 已提交
722 723 724 725 726
void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
    if (pTbCur->pCur) {
      pTbCur->pCur->close(pTbCur->pCur);
    }
wafwerar's avatar
wafwerar 已提交
727
    taosMemoryFree(pTbCur);
H
more  
Hongze Cheng 已提交
728 729 730 731 732 733 734
  }
}

char *metaTbCursorNext(SMTbCursor *pTbCur) {
  DBT    key = {0};
  DBT    value = {0};
  STbCfg tbCfg;
L
fix  
Liu Jicong 已提交
735
  void  *pBuf;
H
more  
Hongze Cheng 已提交
736

H
Hongze Cheng 已提交
737 738 739 740 741
  for (;;) {
    if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) {
      pBuf = value.data;
      metaDecodeTbInfo(pBuf, &tbCfg);
      if (tbCfg.type == META_SUPER_TABLE) {
wafwerar's avatar
wafwerar 已提交
742 743
        taosMemoryFree(tbCfg.name);
        taosMemoryFree(tbCfg.stbCfg.pTagSchema);
H
Hongze Cheng 已提交
744
        continue;
H
Hongze Cheng 已提交
745
      } else if (tbCfg.type == META_CHILD_TABLE) {
H
Hongze Cheng 已提交
746
        kvRowFree(tbCfg.ctbCfg.pTag);
H
Hongze Cheng 已提交
747 748 749 750 751
      }
      return tbCfg.name;
    } else {
      return NULL;
    }
H
more  
Hongze Cheng 已提交
752
  }
H
more  
Hongze Cheng 已提交
753 754 755 756
}

STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
  STSchemaBuilder sb;
L
fix  
Liu Jicong 已提交
757 758
  STSchema       *pTSchema = NULL;
  SSchema        *pSchema;
H
more  
Hongze Cheng 已提交
759
  SSchemaWrapper *pSW;
L
fix  
Liu Jicong 已提交
760
  STbCfg         *pTbCfg;
H
Hongze Cheng 已提交
761
  tb_uid_t        quid;
H
more  
Hongze Cheng 已提交
762

H
Hongze Cheng 已提交
763 764 765 766 767 768 769 770
  pTbCfg = metaGetTbInfoByUid(pMeta, uid);
  if (pTbCfg->type == META_CHILD_TABLE) {
    quid = pTbCfg->ctbCfg.suid;
  } else {
    quid = uid;
  }

  pSW = metaGetTableSchema(pMeta, quid, sver, true);
H
more  
Hongze Cheng 已提交
771 772 773 774 775 776 777 778 779 780 781 782 783 784
  if (pSW == NULL) {
    return NULL;
  }

  // Rebuild a schema
  tdInitTSchemaBuilder(&sb, 0);
  for (int32_t i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
  tdDestroyTSchemaBuilder(&sb);

  return pTSchema;
H
more  
Hongze Cheng 已提交
785 786 787
}

struct SMCtbCursor {
L
fix  
Liu Jicong 已提交
788
  DBC     *pCur;
H
more  
Hongze Cheng 已提交
789 790 791 792 793
  tb_uid_t suid;
};

SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
  SMCtbCursor *pCtbCur = NULL;
L
fix  
Liu Jicong 已提交
794
  SMetaDB     *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
795 796
  int          ret;

wafwerar's avatar
wafwerar 已提交
797
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
H
more  
Hongze Cheng 已提交
798 799 800 801 802 803 804
  if (pCtbCur == NULL) {
    return NULL;
  }

  pCtbCur->suid = uid;
  ret = pDB->pCtbIdx->cursor(pDB->pCtbIdx, NULL, &(pCtbCur->pCur), 0);
  if (ret != 0) {
wafwerar's avatar
wafwerar 已提交
805
    taosMemoryFree(pCtbCur);
H
more  
Hongze Cheng 已提交
806 807 808 809 810 811 812 813 814 815 816 817
    return NULL;
  }

  return pCtbCur;
}

void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
  if (pCtbCur) {
    if (pCtbCur->pCur) {
      pCtbCur->pCur->close(pCtbCur->pCur);
    }

wafwerar's avatar
wafwerar 已提交
818
    taosMemoryFree(pCtbCur);
H
more  
Hongze Cheng 已提交
819 820 821
  }
}

822
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
more  
Hongze Cheng 已提交
823 824 825
  DBT    skey = {0};
  DBT    pkey = {0};
  DBT    pval = {0};
L
fix  
Liu Jicong 已提交
826
  void  *pBuf;
H
more  
Hongze Cheng 已提交
827 828 829 830 831 832
  STbCfg tbCfg;

  // Set key
  skey.data = &(pCtbCur->suid);
  skey.size = sizeof(pCtbCur->suid);

833 834 835 836
  if (pCtbCur->pCur->pget(pCtbCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
    tb_uid_t id = *(tb_uid_t *)pkey.data;
    assert(id != 0);
    return id;
H
Hongze Cheng 已提交
837 838
    //    metaDecodeTbInfo(pBuf, &tbCfg);
    //    return tbCfg.;
H
more  
Hongze Cheng 已提交
839
  } else {
840
    return 0;
H
more  
Hongze Cheng 已提交
841
  }
H
Hongze Cheng 已提交
842 843
}

C
Cary Xu 已提交
844 845 846 847 848 849 850 851 852 853
struct SMSmaCursor {
  DBC     *pCur;
  tb_uid_t uid;
};

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pCur = NULL;
  SMetaDB     *pDB = pMeta->pDB;
  int          ret;

wafwerar's avatar
wafwerar 已提交
854
  pCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pCur));
C
Cary Xu 已提交
855 856 857 858 859
  if (pCur == NULL) {
    return NULL;
  }

  pCur->uid = uid;
C
Cary Xu 已提交
860
  // TODO: lock?
C
Cary Xu 已提交
861 862
  ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0);
  if (ret != 0) {
wafwerar's avatar
wafwerar 已提交
863
    taosMemoryFree(pCur);
C
Cary Xu 已提交
864 865 866 867 868 869 870 871 872 873 874 875
    return NULL;
  }

  return pCur;
}

void metaCloseSmaCurosr(SMSmaCursor *pCur) {
  if (pCur) {
    if (pCur->pCur) {
      pCur->pCur->close(pCur->pCur);
    }

wafwerar's avatar
wafwerar 已提交
876
    taosMemoryFree(pCur);
C
Cary Xu 已提交
877 878 879
  }
}

C
Cary Xu 已提交
880 881 882 883
const char *metaSmaCursorNext(SMSmaCursor *pCur) {
  DBT skey = {0};
  DBT pkey = {0};
  DBT pval = {0};
C
Cary Xu 已提交
884 885 886 887

  // Set key
  skey.data = &(pCur->uid);
  skey.size = sizeof(pCur->uid);
C
Cary Xu 已提交
888
  // TODO: lock?
C
Cary Xu 已提交
889
  if (pCur->pCur->pget(pCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
C
Cary Xu 已提交
890
    const char *indexName = (const char *)pkey.data;
C
Cary Xu 已提交
891 892 893
    assert(indexName != NULL);
    return indexName;
  } else {
C
Cary Xu 已提交
894
    return NULL;
C
Cary Xu 已提交
895 896 897
  }
}

C
Cary Xu 已提交
898
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
C
Cary Xu 已提交
899 900
  STSmaWrapper *pSW = NULL;

wafwerar's avatar
wafwerar 已提交
901
  pSW = taosMemoryCalloc(1, sizeof(*pSW));
C
Cary Xu 已提交
902 903 904 905 906 907
  if (pSW == NULL) {
    return NULL;
  }

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (pCur == NULL) {
wafwerar's avatar
wafwerar 已提交
908
    taosMemoryFree(pSW);
C
Cary Xu 已提交
909 910 911
    return NULL;
  }

C
Cary Xu 已提交
912 913
  DBT   skey = {.data = &(pCur->uid), .size = sizeof(pCur->uid)};
  DBT   pval = {0};
C
Cary Xu 已提交
914 915 916 917 918 919
  void *pBuf = NULL;

  while (true) {
    // TODO: lock?
    if (pCur->pCur->pget(pCur->pCur, &skey, NULL, &pval, DB_NEXT) == 0) {
      ++pSW->number;
wafwerar's avatar
wafwerar 已提交
920
      STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma));
C
Cary Xu 已提交
921 922
      if (tptr == NULL) {
        metaCloseSmaCurosr(pCur);
C
Cary Xu 已提交
923
        tdDestroyTSmaWrapper(pSW);
wafwerar's avatar
wafwerar 已提交
924
        taosMemoryFreeClear(pSW);
C
Cary Xu 已提交
925 926 927 928 929 930
        return NULL;
      }
      pSW->tSma = tptr;
      pBuf = pval.data;
      if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) {
        metaCloseSmaCurosr(pCur);
C
Cary Xu 已提交
931
        tdDestroyTSmaWrapper(pSW);
wafwerar's avatar
wafwerar 已提交
932
        taosMemoryFreeClear(pSW);
C
Cary Xu 已提交
933 934 935 936 937 938 939 940
        return NULL;
      }
      continue;
    }
    break;
  }

  metaCloseSmaCurosr(pCur);
C
Cary Xu 已提交
941
  
C
Cary Xu 已提交
942 943
  return pSW;
}
C
Cary Xu 已提交
944

C
Cary Xu 已提交
945 946 947 948 949
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
  SArray * pUids = NULL;
  SMetaDB *pDB = pMeta->pDB;
  DBC *    pCur = NULL;
  DBT      pkey = {0}, pval = {0};
950
  uint32_t mode = isDup ? DB_NEXT_DUP : DB_NEXT_NODUP;
C
Cary Xu 已提交
951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968
  int      ret;

  pUids = taosArrayInit(16, sizeof(tb_uid_t));

  if (!pUids) {
    return NULL;
  }

  // TODO: lock?
  ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0);
  if (ret != 0) {
    taosArrayDestroy(pUids);
    return NULL;
  }

  void *pBuf = NULL;

  // TODO: lock?
969
  while ((ret = pCur->get(pCur, &pkey, &pval, mode)) == 0) {
C
Cary Xu 已提交
970 971 972 973 974 975 976 977 978 979
      taosArrayPush(pUids, pkey.data);
  }

  if (pCur) {
    pCur->close(pCur);
  }

  return pUids;
}

H
Hongze Cheng 已提交
980 981
static void metaDBWLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
982
  taosThreadRwlockWrlock(&(pDB->rwlock));
H
Hongze Cheng 已提交
983 984 985 986 987
#endif
}

static void metaDBRLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
988
  taosThreadRwlockRdlock(&(pDB->rwlock));
H
Hongze Cheng 已提交
989 990 991 992 993
#endif
}

static void metaDBULock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
994
  taosThreadRwlockUnlock(&(pDB->rwlock));
H
Hongze Cheng 已提交
995 996
#endif
}