metaBDBImpl.c 25.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define ALLOW_FORBID_FUNC
H
more  
Hongze Cheng 已提交
17 18
#include "db.h"

H
Hongze Cheng 已提交
19 20
#include "metaDef.h"

H
Hongze Cheng 已提交
21
#include "tcoding.h"
H
more  
Hongze Cheng 已提交
22
#include "thash.h"
H
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25 26 27
#define IMPL_WITH_LOCK 1
// #if IMPL_WITH_LOCK
// #endif

H
Hongze Cheng 已提交
28 29 30
typedef struct {
  tb_uid_t uid;
  int32_t  sver;
H
Hongze Cheng 已提交
31
  int32_t  padding;
H
Hongze Cheng 已提交
32 33
} SSchemaKey;

H
Hongze Cheng 已提交
34
struct SMetaDB {
H
Hongze Cheng 已提交
35
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
36
  TdThreadRwlock rwlock;
H
Hongze Cheng 已提交
37
#endif
H
more  
Hongze Cheng 已提交
38
  // DB
H
more  
Hongze Cheng 已提交
39 40
  DB *pTbDB;
  DB *pSchemaDB;
C
Cary Xu 已提交
41
  DB *pSmaDB;
C
Cary Xu 已提交
42

H
more  
Hongze Cheng 已提交
43
  // IDX
H
more  
Hongze Cheng 已提交
44 45 46 47
  DB *pNameIdx;
  DB *pStbIdx;
  DB *pNtbIdx;
  DB *pCtbIdx;
C
Cary Xu 已提交
48
  DB *pSmaIdx;
H
more  
Hongze Cheng 已提交
49
  // ENV
H
Hongze Cheng 已提交
50 51 52
  DB_ENV *pEvn;
};

H
Hongze Cheng 已提交
53 54
typedef int (*bdbIdxCbPtr)(DB *, const DBT *, const DBT *, DBT *);

H
more  
Hongze Cheng 已提交
55 56
static SMetaDB *metaNewDB();
static void     metaFreeDB(SMetaDB *pDB);
H
more  
Hongze Cheng 已提交
57 58
static int      metaOpenBDBEnv(DB_ENV **ppEnv, const char *path);
static void     metaCloseBDBEnv(DB_ENV *pEnv);
H
Hongze Cheng 已提交
59
static int      metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
H
more  
Hongze Cheng 已提交
60
static void     metaCloseBDBDb(DB *pDB);
H
Hongze Cheng 已提交
61
static int      metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup);
H
Hongze Cheng 已提交
62
static void     metaCloseBDBIdx(DB *pIdx);
H
Hongze Cheng 已提交
63 64 65 66
static int      metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
C
Cary Xu 已提交
67
static int      metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
H
Hongze Cheng 已提交
68
static int      metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
L
fix  
Liu Jicong 已提交
69
static void    *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
H
Hongze Cheng 已提交
70
static void     metaClearTbCfg(STbCfg *pTbCfg);
H
Hongze Cheng 已提交
71
static int      metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
L
fix  
Liu Jicong 已提交
72
static void    *metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
C
Cary Xu 已提交
73 74
static int      metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW);
static void    *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx);
H
Hongze Cheng 已提交
75 76 77
static void     metaDBWLock(SMetaDB *pDB);
static void     metaDBRLock(SMetaDB *pDB);
static void     metaDBULock(SMetaDB *pDB);
C
Cary Xu 已提交
78
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx);
H
more  
Hongze Cheng 已提交
79

H
more  
Hongze Cheng 已提交
80
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
H
Hongze Cheng 已提交
81

H
Hongze Cheng 已提交
82
int metaOpenDB(SMeta *pMeta) {
H
more  
Hongze Cheng 已提交
83
  SMetaDB *pDB;
H
Hongze Cheng 已提交
84

H
more  
Hongze Cheng 已提交
85 86 87
  // Create DB object
  pDB = metaNewDB();
  if (pDB == NULL) {
H
more  
Hongze Cheng 已提交
88 89 90
    return -1;
  }

H
more  
Hongze Cheng 已提交
91
  pMeta->pDB = pDB;
H
more  
Hongze Cheng 已提交
92

H
more  
Hongze Cheng 已提交
93 94
  // Open DB Env
  if (metaOpenBDBEnv(&(pDB->pEvn), pMeta->path) < 0) {
H
more  
Hongze Cheng 已提交
95
    metaCloseDB(pMeta);
H
more  
Hongze Cheng 已提交
96 97
    return -1;
  }
H
Hongze Cheng 已提交
98

H
more  
Hongze Cheng 已提交
99
  // Open DBs
H
Hongze Cheng 已提交
100
  if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db", false) < 0) {
H
more  
Hongze Cheng 已提交
101 102
    metaCloseDB(pMeta);
    return -1;
H
more  
Hongze Cheng 已提交
103 104
  }

H
more  
Hongze Cheng 已提交
105
  if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "schema.db", false) < 0) {
H
more  
Hongze Cheng 已提交
106 107 108
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
109

C
Cary Xu 已提交
110 111 112 113 114
  if (metaOpenBDBDb(&(pDB->pSmaDB), pDB->pEvn, "sma.db", false) < 0) {
    metaCloseDB(pMeta);
    return -1;
  }

H
more  
Hongze Cheng 已提交
115
  // Open Indices
H
Hongze Cheng 已提交
116
  if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
117 118 119
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
120

H
Hongze Cheng 已提交
121
  if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "stb.index", pDB->pTbDB, &metaStbIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
122 123 124
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
125

H
Hongze Cheng 已提交
126
  if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "ntb.index", pDB->pTbDB, &metaNtbIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
127 128
    metaCloseDB(pMeta);
    return -1;
H
Hongze Cheng 已提交
129 130
  }

H
Hongze Cheng 已提交
131
  if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "ctb.index", pDB->pTbDB, &metaCtbIdxCb, true) < 0) {
H
more  
Hongze Cheng 已提交
132 133
    metaCloseDB(pMeta);
    return -1;
H
more  
Hongze Cheng 已提交
134 135
  }

C
Cary Xu 已提交
136 137 138 139 140
  if (metaOpenBDBIdx(&(pDB->pSmaIdx), pDB->pEvn, "sma.index", pDB->pSmaDB, &metaSmaIdxCb, true) < 0) {
    metaCloseDB(pMeta);
    return -1;
  }

H
Hongze Cheng 已提交
141 142 143 144 145
  return 0;
}

void metaCloseDB(SMeta *pMeta) {
  if (pMeta->pDB) {
C
Cary Xu 已提交
146
    metaCloseBDBIdx(pMeta->pDB->pSmaIdx);
H
Hongze Cheng 已提交
147 148 149 150
    metaCloseBDBIdx(pMeta->pDB->pCtbIdx);
    metaCloseBDBIdx(pMeta->pDB->pNtbIdx);
    metaCloseBDBIdx(pMeta->pDB->pStbIdx);
    metaCloseBDBIdx(pMeta->pDB->pNameIdx);
C
Cary Xu 已提交
151
    metaCloseBDBDb(pMeta->pDB->pSmaDB);
H
Hongze Cheng 已提交
152 153
    metaCloseBDBDb(pMeta->pDB->pSchemaDB);
    metaCloseBDBDb(pMeta->pDB->pTbDB);
H
more  
Hongze Cheng 已提交
154
    metaCloseBDBEnv(pMeta->pDB->pEvn);
H
more  
Hongze Cheng 已提交
155 156
    metaFreeDB(pMeta->pDB);
    pMeta->pDB = NULL;
H
Hongze Cheng 已提交
157 158 159
  }
}

H
more  
Hongze Cheng 已提交
160
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
C
Cary Xu 已提交
161 162 163 164 165 166 167
  tb_uid_t   uid;
  char       buf[512];
  char       buf1[512];
  void      *pBuf;
  DBT        key1, value1;
  DBT        key2, value2;
  SSchemaEx *pSchema = NULL;
H
Hongze Cheng 已提交
168 169 170 171 172 173 174 175 176

  if (pTbCfg->type == META_SUPER_TABLE) {
    uid = pTbCfg->stbCfg.suid;
  } else {
    uid = metaGenerateUid(pMeta);
  }

  {
    // save table info
H
Hongze Cheng 已提交
177
    pBuf = buf;
H
Hongze Cheng 已提交
178 179
    memset(&key1, 0, sizeof(key1));
    memset(&value1, 0, sizeof(key1));
H
Hongze Cheng 已提交
180

H
Hongze Cheng 已提交
181 182
    key1.data = &uid;
    key1.size = sizeof(uid);
H
Hongze Cheng 已提交
183

H
Hongze Cheng 已提交
184
    metaEncodeTbInfo(&pBuf, pTbCfg);
H
Hongze Cheng 已提交
185

H
Hongze Cheng 已提交
186 187 188
    value1.data = buf;
    value1.size = POINTER_DISTANCE(pBuf, buf);
    value1.app_data = pTbCfg;
H
Hongze Cheng 已提交
189 190 191
  }

  // save schema
H
more  
Hongze Cheng 已提交
192
  uint32_t ncols;
H
Hongze Cheng 已提交
193
  if (pTbCfg->type == META_SUPER_TABLE) {
H
more  
Hongze Cheng 已提交
194
    ncols = pTbCfg->stbCfg.nCols;
H
Hongze Cheng 已提交
195 196
    pSchema = pTbCfg->stbCfg.pSchema;
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
H
more  
Hongze Cheng 已提交
197
    ncols = pTbCfg->ntbCfg.nCols;
H
Hongze Cheng 已提交
198 199 200 201
    pSchema = pTbCfg->ntbCfg.pSchema;
  }

  if (pSchema) {
H
fix  
Hongze Cheng 已提交
202
    pBuf = buf1;
H
Hongze Cheng 已提交
203 204
    memset(&key2, 0, sizeof(key2));
    memset(&value2, 0, sizeof(key2));
H
Hongze Cheng 已提交
205
    SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0};
H
Hongze Cheng 已提交
206

H
Hongze Cheng 已提交
207 208
    key2.data = &schemaKey;
    key2.size = sizeof(schemaKey);
H
Hongze Cheng 已提交
209

C
Cary Xu 已提交
210 211
    SSchemaWrapper sw = {.nCols = ncols, .pSchemaEx = pSchema};
    metaEncodeSchemaEx(&pBuf, &sw);
H
Hongze Cheng 已提交
212

H
fix  
Hongze Cheng 已提交
213 214
    value2.data = buf1;
    value2.size = POINTER_DISTANCE(pBuf, buf1);
H
Hongze Cheng 已提交
215 216
  }

H
Hongze Cheng 已提交
217 218
  metaDBWLock(pMeta->pDB);
  pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key1, &value1, 0);
H
fix  
Hongze Cheng 已提交
219 220 221
  if (pSchema) {
    pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key2, &value2, 0);
  }
H
Hongze Cheng 已提交
222 223
  metaDBULock(pMeta->pDB);

H
more  
Hongze Cheng 已提交
224 225 226 227 228 229 230 231
  return 0;
}

int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
  // TODO
  return 0;
}

C
Cary Xu 已提交
232
int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
C
Cary Xu 已提交
233 234
  // char  buf[512] = {0};  // TODO: may overflow
  void *pBuf = NULL, *qBuf = NULL;
C
Cary Xu 已提交
235 236
  DBT   key1 = {0}, value1 = {0};

237 238
  // save sma info
  int32_t len = tEncodeTSma(NULL, pSmaCfg);
239
  pBuf = taosMemoryCalloc(1, len);
240 241 242
  if (pBuf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
C
Cary Xu 已提交
243 244
  }

245 246 247 248 249 250 251 252 253 254
  key1.data = (void *)&pSmaCfg->indexUid;
  key1.size = sizeof(pSmaCfg->indexUid);

  qBuf = pBuf;
  tEncodeTSma(&qBuf, pSmaCfg);

  value1.data = pBuf;
  value1.size = POINTER_DISTANCE(qBuf, pBuf);
  value1.app_data = pSmaCfg;

C
Cary Xu 已提交
255 256 257 258
  metaDBWLock(pMeta->pDB);
  pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0);
  metaDBULock(pMeta->pDB);

259
  // release
wafwerar's avatar
wafwerar 已提交
260
  taosMemoryFreeClear(pBuf);
261

C
Cary Xu 已提交
262 263 264
  return 0;
}

C
Cary Xu 已提交
265
int metaRemoveSmaFromDb(SMeta *pMeta,  int64_t indexUid) {
C
Cary Xu 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
  // TODO
#if 0
  DBT key = {0};

  key.data = (void *)indexName;
  key.size = strlen(indexName);

  metaDBWLock(pMeta->pDB);
  // TODO: No guarantee of consistence.
  // Use transaction or DB->sync() for some guarantee.
  pMeta->pDB->pSmaDB->del(pMeta->pDB->pSmaDB, NULL, &key, 0);
  metaDBULock(pMeta->pDB);
#endif
  return 0;
}

H
more  
Hongze Cheng 已提交
282
/* ------------------------ STATIC METHODS ------------------------ */
H
Hongze Cheng 已提交
283 284 285 286 287 288 289 290
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
  int      tlen = 0;
  SSchema *pSchema;

  tlen += taosEncodeFixedU32(buf, pSW->nCols);
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tlen += taosEncodeFixedI8(buf, pSchema->type);
291
    tlen += taosEncodeFixedI16(buf, pSchema->colId);
H
Hongze Cheng 已提交
292 293 294 295 296 297 298 299 300 301 302
    tlen += taosEncodeFixedI32(buf, pSchema->bytes);
    tlen += taosEncodeString(buf, pSchema->name);
  }

  return tlen;
}

static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
  SSchema *pSchema;

  buf = taosDecodeFixedU32(buf, &pSW->nCols);
wafwerar's avatar
wafwerar 已提交
303
  pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
C
Cary Xu 已提交
304 305

  int8_t dummy;
H
Hongze Cheng 已提交
306 307 308
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    buf = taosDecodeFixedI8(buf, &pSchema->type);
309
    buf = taosDecodeFixedI16(buf, &pSchema->colId);
H
Hongze Cheng 已提交
310 311 312 313 314 315 316
    buf = taosDecodeFixedI32(buf, &pSchema->bytes);
    buf = taosDecodeStringTo(buf, pSchema->name);
  }

  return buf;
}

C
Cary Xu 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
static int metaEncodeSchemaEx(void **buf, SSchemaWrapper *pSW) {
  int        tlen = 0;
  SSchemaEx *pSchema;

  tlen += taosEncodeFixedU32(buf, pSW->nCols);
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchemaEx + i;
    tlen += taosEncodeFixedI8(buf, pSchema->type);
    tlen += taosEncodeFixedI8(buf, pSchema->sma);
    tlen += taosEncodeFixedI16(buf, pSchema->colId);
    tlen += taosEncodeFixedI32(buf, pSchema->bytes);
    tlen += taosEncodeString(buf, pSchema->name);
  }

  return tlen;
}

static void *metaDecodeSchemaEx(void *buf, SSchemaWrapper *pSW, bool isGetEx) {
  buf = taosDecodeFixedU32(buf, &pSW->nCols);
  if (isGetEx) {
    pSW->pSchemaEx = (SSchemaEx *)taosMemoryMalloc(sizeof(SSchemaEx) * pSW->nCols);
    for (int i = 0; i < pSW->nCols; i++) {
      SSchemaEx *pSchema = pSW->pSchemaEx + i;
      buf = taosDecodeFixedI8(buf, &pSchema->type);
      buf = taosDecodeFixedI8(buf, &pSchema->sma);
      buf = taosDecodeFixedI16(buf, &pSchema->colId);
      buf = taosDecodeFixedI32(buf, &pSchema->bytes);
      buf = taosDecodeStringTo(buf, pSchema->name);
    }
  } else {
    pSW->pSchema = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
    for (int i = 0; i < pSW->nCols; i++) {
      SSchema *pSchema = pSW->pSchema + i;
      buf = taosDecodeFixedI8(buf, &pSchema->type);
      buf = taosSkipFixedLen(buf, sizeof(int8_t));
      buf = taosDecodeFixedI16(buf, &pSchema->colId);
      buf = taosDecodeFixedI32(buf, &pSchema->bytes);
      buf = taosDecodeStringTo(buf, pSchema->name);
    }
  }

  return buf;
}

H
more  
Hongze Cheng 已提交
361 362
static SMetaDB *metaNewDB() {
  SMetaDB *pDB = NULL;
wafwerar's avatar
wafwerar 已提交
363
  pDB = (SMetaDB *)taosMemoryCalloc(1, sizeof(*pDB));
H
more  
Hongze Cheng 已提交
364 365 366 367
  if (pDB == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
368
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
369
  taosThreadRwlockInit(&pDB->rwlock, NULL);
H
Hongze Cheng 已提交
370 371
#endif

H
more  
Hongze Cheng 已提交
372 373 374 375 376
  return pDB;
}

static void metaFreeDB(SMetaDB *pDB) {
  if (pDB) {
H
Hongze Cheng 已提交
377
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
378
    taosThreadRwlockDestroy(&pDB->rwlock);
H
Hongze Cheng 已提交
379
#endif
wafwerar's avatar
wafwerar 已提交
380
    taosMemoryFree(pDB);
H
more  
Hongze Cheng 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
  }
}

static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
  int     ret;
  DB_ENV *pEnv;

  if (path == NULL) return 0;

  ret = db_env_create(&pEnv, 0);
  if (ret != 0) {
    BDB_PERR("Failed to create META env", ret);
    return -1;
  }

H
Hongze Cheng 已提交
396
  ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
H
more  
Hongze Cheng 已提交
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
  if (ret != 0) {
    BDB_PERR("Failed to open META env", ret);
    return -1;
  }

  *ppEnv = pEnv;

  return 0;
}

static void metaCloseBDBEnv(DB_ENV *pEnv) {
  if (pEnv) {
    pEnv->close(pEnv, 0);
  }
}

H
Hongze Cheng 已提交
413
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) {
H
more  
Hongze Cheng 已提交
414 415 416
  int ret;
  DB *pDB;

H
Hongze Cheng 已提交
417
  ret = db_create(&(pDB), pEnv, 0);
H
more  
Hongze Cheng 已提交
418 419 420 421 422
  if (ret != 0) {
    BDB_PERR("Failed to create META DB", ret);
    return -1;
  }

H
Hongze Cheng 已提交
423 424 425 426 427 428 429 430
  if (isDup) {
    ret = pDB->set_flags(pDB, DB_DUPSORT);
    if (ret != 0) {
      BDB_PERR("Failed to set DB flags", ret);
      return -1;
    }
  }

H
more  
Hongze Cheng 已提交
431 432 433 434 435 436
  ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0);
  if (ret) {
    BDB_PERR("Failed to open META DB", ret);
    return -1;
  }

H
Hongze Cheng 已提交
437 438
  *ppDB = pDB;

H
more  
Hongze Cheng 已提交
439 440 441 442 443 444 445 446 447
  return 0;
}

static void metaCloseBDBDb(DB *pDB) {
  if (pDB) {
    pDB->close(pDB, 0);
  }
}

H
Hongze Cheng 已提交
448
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup) {
H
Hongze Cheng 已提交
449 450 451
  DB *pIdx;
  int ret;

H
Hongze Cheng 已提交
452
  if (metaOpenBDBDb(ppIdx, pEnv, pFName, isDup) < 0) {
H
Hongze Cheng 已提交
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
    return -1;
  }

  pIdx = *ppIdx;
  ret = pDB->associate(pDB, NULL, pIdx, cbf, 0);
  if (ret) {
    BDB_PERR("Failed to associate META DB and Index", ret);
  }

  return 0;
}

static void metaCloseBDBIdx(DB *pIdx) {
  if (pIdx) {
    pIdx->close(pIdx, 0);
  }
}

static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
472
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
473

H
Hongze Cheng 已提交
474
  memset(pSKey, 0, sizeof(*pSKey));
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476 477
  pSKey->data = pTbCfg->name;
  pSKey->size = strlen(pTbCfg->name);
H
Hongze Cheng 已提交
478 479 480 481

  return 0;
}

H
Hongze Cheng 已提交
482
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
483
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
484

H
Hongze Cheng 已提交
485
  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
486 487 488
    memset(pSKey, 0, sizeof(*pSKey));
    pSKey->data = pKey->data;
    pSKey->size = pKey->size;
H
Hongze Cheng 已提交
489

H
Hongze Cheng 已提交
490
    return 0;
H
Hongze Cheng 已提交
491
  } else {
H
Hongze Cheng 已提交
492
    return DB_DONOTINDEX;
H
Hongze Cheng 已提交
493
  }
H
Hongze Cheng 已提交
494 495
}

H
Hongze Cheng 已提交
496
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
497
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
498

H
Hongze Cheng 已提交
499
  if (pTbCfg->type == META_NORMAL_TABLE) {
H
Hongze Cheng 已提交
500 501 502
    memset(pSKey, 0, sizeof(*pSKey));
    pSKey->data = pKey->data;
    pSKey->size = pKey->size;
H
Hongze Cheng 已提交
503

H
Hongze Cheng 已提交
504 505 506 507
    return 0;
  } else {
    return DB_DONOTINDEX;
  }
H
Hongze Cheng 已提交
508 509
}

H
Hongze Cheng 已提交
510
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
511
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
L
fix  
Liu Jicong 已提交
512
  DBT    *pDbt;
H
Hongze Cheng 已提交
513

H
Hongze Cheng 已提交
514
  if (pTbCfg->type == META_CHILD_TABLE) {
wafwerar's avatar
wafwerar 已提交
515
    // pDbt = taosMemoryCalloc(2, sizeof(DBT));
H
Hongze Cheng 已提交
516

H
Hongze Cheng 已提交
517 518 519
    // // First key is suid
    // pDbt[0].data = &(pTbCfg->ctbCfg.suid);
    // pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid);
H
Hongze Cheng 已提交
520

H
Hongze Cheng 已提交
521 522 523 524
    // // Second key is the first tag
    // void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
    // pDbt[1].data = pTagVal;
    // pDbt[1].size = sizeof(int32_t);
H
Hongze Cheng 已提交
525

H
Hongze Cheng 已提交
526
    // Set index key
H
Hongze Cheng 已提交
527
    memset(pSKey, 0, sizeof(*pSKey));
H
Hongze Cheng 已提交
528
#if 0
H
Hongze Cheng 已提交
529 530 531
    pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
    pSKey->data = pDbt;
    pSKey->size = 2;
H
Hongze Cheng 已提交
532 533 534 535
#else
    pSKey->data = &(pTbCfg->ctbCfg.suid);
    pSKey->size = sizeof(pTbCfg->ctbCfg.suid);
#endif
H
Hongze Cheng 已提交
536

H
Hongze Cheng 已提交
537 538 539 540
    return 0;
  } else {
    return DB_DONOTINDEX;
  }
H
Hongze Cheng 已提交
541 542
}

C
Cary Xu 已提交
543
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
C
Cary Xu 已提交
544
  STSma *pSmaCfg = (STSma *)(pValue->app_data);
C
Cary Xu 已提交
545 546 547 548 549 550 551 552

  memset(pSKey, 0, sizeof(*pSKey));
  pSKey->data = &(pSmaCfg->tableUid);
  pSKey->size = sizeof(pSmaCfg->tableUid);

  return 0;
}

H
Hongze Cheng 已提交
553
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
H
more  
Hongze Cheng 已提交
554 555 556 557 558
  int tsize = 0;

  tsize += taosEncodeString(buf, pTbCfg->name);
  tsize += taosEncodeFixedU32(buf, pTbCfg->ttl);
  tsize += taosEncodeFixedU32(buf, pTbCfg->keep);
C
Cary Xu 已提交
559
  tsize += taosEncodeFixedU8(buf, pTbCfg->info);
H
more  
Hongze Cheng 已提交
560 561

  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
562 563
    SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema};
    tsize += metaEncodeSchema(buf, &sw);
H
more  
Hongze Cheng 已提交
564 565 566 567
  } else if (pTbCfg->type == META_CHILD_TABLE) {
    tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
    tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
568
    // TODO
H
more  
Hongze Cheng 已提交
569 570 571 572 573
  } else {
    ASSERT(0);
  }

  return tsize;
H
Hongze Cheng 已提交
574 575 576
}

static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
H
more  
Hongze Cheng 已提交
577 578 579
  buf = taosDecodeString(buf, &(pTbCfg->name));
  buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl));
  buf = taosDecodeFixedU32(buf, &(pTbCfg->keep));
C
Cary Xu 已提交
580
  buf = taosDecodeFixedU8(buf, &(pTbCfg->info));
H
more  
Hongze Cheng 已提交
581 582

  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
583 584 585 586
    SSchemaWrapper sw;
    buf = metaDecodeSchema(buf, &sw);
    pTbCfg->stbCfg.nTagCols = sw.nCols;
    pTbCfg->stbCfg.pTagSchema = sw.pSchema;
H
more  
Hongze Cheng 已提交
587 588 589 590
  } else if (pTbCfg->type == META_CHILD_TABLE) {
    buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
    buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
591
    // TODO
H
more  
Hongze Cheng 已提交
592 593 594 595
  } else {
    ASSERT(0);
  }
  return buf;
H
Hongze Cheng 已提交
596 597
}

H
Hongze Cheng 已提交
598
static void metaClearTbCfg(STbCfg *pTbCfg) {
wafwerar's avatar
wafwerar 已提交
599
  taosMemoryFreeClear(pTbCfg->name);
H
Hongze Cheng 已提交
600 601 602
  if (pTbCfg->type == META_SUPER_TABLE) {
    tdFreeSchema(pTbCfg->stbCfg.pTagSchema);
  } else if (pTbCfg->type == META_CHILD_TABLE) {
wafwerar's avatar
wafwerar 已提交
603
    taosMemoryFreeClear(pTbCfg->ctbCfg.pTag);
H
Hongze Cheng 已提交
604
  }
H
more  
Hongze Cheng 已提交
605 606 607
}

/* ------------------------ FOR QUERY ------------------------ */
H
Hongze Cheng 已提交
608
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
L
fix  
Liu Jicong 已提交
609
  STbCfg  *pTbCfg = NULL;
H
Hongze Cheng 已提交
610 611 612 613 614 615 616 617 618 619
  SMetaDB *pDB = pMeta->pDB;
  DBT      key = {0};
  DBT      value = {0};
  int      ret;

  // Set key/value
  key.data = &uid;
  key.size = sizeof(uid);

  // Query
H
Hongze Cheng 已提交
620
  metaDBRLock(pDB);
H
Hongze Cheng 已提交
621
  ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0);
H
Hongze Cheng 已提交
622
  metaDBULock(pDB);
H
Hongze Cheng 已提交
623 624 625 626 627
  if (ret != 0) {
    return NULL;
  }

  // Decode
wafwerar's avatar
wafwerar 已提交
628
  pTbCfg = (STbCfg *)taosMemoryMalloc(sizeof(*pTbCfg));
H
Hongze Cheng 已提交
629 630 631
  if (pTbCfg == NULL) {
    return NULL;
  }
H
more  
Hongze Cheng 已提交
632

H
Hongze Cheng 已提交
633 634 635 636 637
  metaDecodeTbInfo(value.data, pTbCfg);

  return pTbCfg;
}

H
Hongze Cheng 已提交
638
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
L
fix  
Liu Jicong 已提交
639
  STbCfg  *pTbCfg = NULL;
H
Hongze Cheng 已提交
640 641 642 643 644 645 646
  SMetaDB *pDB = pMeta->pDB;
  DBT      key = {0};
  DBT      pkey = {0};
  DBT      pvalue = {0};
  int      ret;

  // Set key/value
H
more  
Hongze Cheng 已提交
647
  key.data = tbname;
H
Hongze Cheng 已提交
648
  key.size = strlen(tbname);
H
more  
Hongze Cheng 已提交
649

H
Hongze Cheng 已提交
650
  // Query
H
Hongze Cheng 已提交
651
  metaDBRLock(pDB);
H
Hongze Cheng 已提交
652
  ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0);
H
Hongze Cheng 已提交
653
  metaDBULock(pDB);
H
Hongze Cheng 已提交
654
  if (ret != 0) {
H
Hongze Cheng 已提交
655
    return NULL;
H
Hongze Cheng 已提交
656 657
  }

H
Hongze Cheng 已提交
658 659
  // Decode
  *uid = *(tb_uid_t *)(pkey.data);
wafwerar's avatar
wafwerar 已提交
660
  pTbCfg = (STbCfg *)taosMemoryMalloc(sizeof(*pTbCfg));
H
Hongze Cheng 已提交
661 662 663 664 665 666 667
  if (pTbCfg == NULL) {
    return NULL;
  }

  metaDecodeTbInfo(pvalue.data, pTbCfg);

  return pTbCfg;
H
more  
Hongze Cheng 已提交
668 669
}

C
Cary Xu 已提交
670
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
C
Cary Xu 已提交
671
  STSma *  pCfg = NULL;
C
Cary Xu 已提交
672 673 674 675 676 677
  SMetaDB *pDB = pMeta->pDB;
  DBT      key = {0};
  DBT      value = {0};
  int      ret;

  // Set key/value
C
Cary Xu 已提交
678 679
  key.data = (void *)&indexUid;
  key.size = sizeof(indexUid);
C
Cary Xu 已提交
680 681 682 683 684 685 686 687 688 689

  // Query
  metaDBRLock(pDB);
  ret = pDB->pTbDB->get(pDB->pSmaDB, NULL, &key, &value, 0);
  metaDBULock(pDB);
  if (ret != 0) {
    return NULL;
  }

  // Decode
wafwerar's avatar
wafwerar 已提交
690
  pCfg = (STSma *)taosMemoryCalloc(1, sizeof(STSma));
C
Cary Xu 已提交
691 692 693 694
  if (pCfg == NULL) {
    return NULL;
  }

C
Cary Xu 已提交
695
  if (tDecodeTSma(value.data, pCfg) == NULL) {
wafwerar's avatar
wafwerar 已提交
696
    taosMemoryFreeClear(pCfg);
C
Cary Xu 已提交
697 698
    return NULL;
  }
C
Cary Xu 已提交
699 700 701 702

  return pCfg;
}

H
Hongze Cheng 已提交
703
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
C
Cary Xu 已提交
704 705 706 707
  return metaGetTableSchemaImpl(pMeta, uid, sver, isinline, false);
}

static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) {
H
more  
Hongze Cheng 已提交
708 709
  uint32_t        nCols;
  SSchemaWrapper *pSW = NULL;
L
fix  
Liu Jicong 已提交
710
  SMetaDB        *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
711
  int             ret;
L
fix  
Liu Jicong 已提交
712
  void           *pBuf;
C
Cary Xu 已提交
713
  // SSchema        *pSchema;
H
Hongze Cheng 已提交
714
  SSchemaKey      schemaKey = {uid, sver, 0};
H
more  
Hongze Cheng 已提交
715 716 717 718 719 720 721 722
  DBT             key = {0};
  DBT             value = {0};

  // Set key/value properties
  key.data = &schemaKey;
  key.size = sizeof(schemaKey);

  // Query
H
Hongze Cheng 已提交
723
  metaDBRLock(pDB);
H
more  
Hongze Cheng 已提交
724
  ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0);
H
Hongze Cheng 已提交
725
  metaDBULock(pDB);
H
more  
Hongze Cheng 已提交
726
  if (ret != 0) {
H
Hongze Cheng 已提交
727
    printf("failed to query schema DB since %s================\n", db_strerror(ret));
H
more  
Hongze Cheng 已提交
728 729 730 731 732
    return NULL;
  }

  // Decode the schema
  pBuf = value.data;
wafwerar's avatar
wafwerar 已提交
733
  pSW = taosMemoryMalloc(sizeof(*pSW));
C
Cary Xu 已提交
734
  metaDecodeSchemaEx(pBuf, pSW, isGetEx);
H
more  
Hongze Cheng 已提交
735 736

  return pSW;
H
more  
Hongze Cheng 已提交
737 738 739 740 741 742 743 744
}

struct SMTbCursor {
  DBC *pCur;
};

SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;
L
fix  
Liu Jicong 已提交
745
  SMetaDB    *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
746

wafwerar's avatar
wafwerar 已提交
747
  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
H
more  
Hongze Cheng 已提交
748 749 750 751 752 753
  if (pTbCur == NULL) {
    return NULL;
  }

  pDB->pTbDB->cursor(pDB->pTbDB, NULL, &(pTbCur->pCur), 0);

H
more  
Hongze Cheng 已提交
754 755 756 757 758 759
#if 0
    DB_BTREE_STAT *sp;
    pDB->pTbDB->stat(pDB->pTbDB, NULL, &sp, 0);
    printf("**************** %ld\n", sp->bt_nkeys);
#endif

H
more  
Hongze Cheng 已提交
760 761 762
  return pTbCur;
}

D
dapan1121 已提交
763 764 765 766 767 768 769 770 771 772 773 774
int metaGetTbNum(SMeta *pMeta) {
  SMetaDB    *pDB = pMeta->pDB;

  DB_BTREE_STAT *sp1;
  pDB->pTbDB->stat(pDB->pNtbIdx, NULL, &sp1, 0);
  
  DB_BTREE_STAT *sp2;
  pDB->pTbDB->stat(pDB->pCtbIdx, NULL, &sp2, 0);
  
  return sp1->bt_nkeys + sp2->bt_nkeys;
}

H
more  
Hongze Cheng 已提交
775 776 777 778 779
void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
    if (pTbCur->pCur) {
      pTbCur->pCur->close(pTbCur->pCur);
    }
wafwerar's avatar
wafwerar 已提交
780
    taosMemoryFree(pTbCur);
H
more  
Hongze Cheng 已提交
781 782 783 784 785 786 787
  }
}

char *metaTbCursorNext(SMTbCursor *pTbCur) {
  DBT    key = {0};
  DBT    value = {0};
  STbCfg tbCfg;
L
fix  
Liu Jicong 已提交
788
  void  *pBuf;
H
more  
Hongze Cheng 已提交
789

H
Hongze Cheng 已提交
790 791 792 793 794
  for (;;) {
    if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) {
      pBuf = value.data;
      metaDecodeTbInfo(pBuf, &tbCfg);
      if (tbCfg.type == META_SUPER_TABLE) {
wafwerar's avatar
wafwerar 已提交
795 796
        taosMemoryFree(tbCfg.name);
        taosMemoryFree(tbCfg.stbCfg.pTagSchema);
H
Hongze Cheng 已提交
797
        continue;
H
Hongze Cheng 已提交
798
      } else if (tbCfg.type == META_CHILD_TABLE) {
H
Hongze Cheng 已提交
799
        kvRowFree(tbCfg.ctbCfg.pTag);
H
Hongze Cheng 已提交
800 801 802 803 804
      }
      return tbCfg.name;
    } else {
      return NULL;
    }
H
more  
Hongze Cheng 已提交
805
  }
H
more  
Hongze Cheng 已提交
806 807 808 809
}

STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
  STSchemaBuilder sb;
L
fix  
Liu Jicong 已提交
810
  STSchema       *pTSchema = NULL;
C
Cary Xu 已提交
811
  SSchemaEx      *pSchema;
H
more  
Hongze Cheng 已提交
812
  SSchemaWrapper *pSW;
L
fix  
Liu Jicong 已提交
813
  STbCfg         *pTbCfg;
H
Hongze Cheng 已提交
814
  tb_uid_t        quid;
H
more  
Hongze Cheng 已提交
815

H
Hongze Cheng 已提交
816 817 818 819 820 821 822
  pTbCfg = metaGetTbInfoByUid(pMeta, uid);
  if (pTbCfg->type == META_CHILD_TABLE) {
    quid = pTbCfg->ctbCfg.suid;
  } else {
    quid = uid;
  }

C
Cary Xu 已提交
823
  pSW = metaGetTableSchemaImpl(pMeta, quid, sver, true, true);
H
more  
Hongze Cheng 已提交
824 825 826 827 828 829
  if (pSW == NULL) {
    return NULL;
  }

  // Rebuild a schema
  tdInitTSchemaBuilder(&sb, 0);
C
Cary Xu 已提交
830 831 832
  for (int32_t i = 0; i < pSW->nCols; ++i) {
    pSchema = pSW->pSchemaEx + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->sma, pSchema->colId, pSchema->bytes);
H
more  
Hongze Cheng 已提交
833 834 835 836 837
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
  tdDestroyTSchemaBuilder(&sb);

  return pTSchema;
H
more  
Hongze Cheng 已提交
838 839 840
}

struct SMCtbCursor {
L
fix  
Liu Jicong 已提交
841
  DBC     *pCur;
H
more  
Hongze Cheng 已提交
842 843 844 845 846
  tb_uid_t suid;
};

SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
  SMCtbCursor *pCtbCur = NULL;
L
fix  
Liu Jicong 已提交
847
  SMetaDB     *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
848 849
  int          ret;

wafwerar's avatar
wafwerar 已提交
850
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
H
more  
Hongze Cheng 已提交
851 852 853 854 855 856 857
  if (pCtbCur == NULL) {
    return NULL;
  }

  pCtbCur->suid = uid;
  ret = pDB->pCtbIdx->cursor(pDB->pCtbIdx, NULL, &(pCtbCur->pCur), 0);
  if (ret != 0) {
wafwerar's avatar
wafwerar 已提交
858
    taosMemoryFree(pCtbCur);
H
more  
Hongze Cheng 已提交
859 860 861 862 863 864 865 866 867 868 869 870
    return NULL;
  }

  return pCtbCur;
}

void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
  if (pCtbCur) {
    if (pCtbCur->pCur) {
      pCtbCur->pCur->close(pCtbCur->pCur);
    }

wafwerar's avatar
wafwerar 已提交
871
    taosMemoryFree(pCtbCur);
H
more  
Hongze Cheng 已提交
872 873 874
  }
}

875
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
more  
Hongze Cheng 已提交
876 877 878
  DBT    skey = {0};
  DBT    pkey = {0};
  DBT    pval = {0};
L
fix  
Liu Jicong 已提交
879
  void  *pBuf;
H
more  
Hongze Cheng 已提交
880 881 882 883 884 885
  STbCfg tbCfg;

  // Set key
  skey.data = &(pCtbCur->suid);
  skey.size = sizeof(pCtbCur->suid);

886 887 888 889
  if (pCtbCur->pCur->pget(pCtbCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
    tb_uid_t id = *(tb_uid_t *)pkey.data;
    assert(id != 0);
    return id;
H
Hongze Cheng 已提交
890 891
    //    metaDecodeTbInfo(pBuf, &tbCfg);
    //    return tbCfg.;
H
more  
Hongze Cheng 已提交
892
  } else {
893
    return 0;
H
more  
Hongze Cheng 已提交
894
  }
H
Hongze Cheng 已提交
895 896
}

C
Cary Xu 已提交
897 898 899 900 901 902 903 904 905 906
struct SMSmaCursor {
  DBC     *pCur;
  tb_uid_t uid;
};

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pCur = NULL;
  SMetaDB     *pDB = pMeta->pDB;
  int          ret;

wafwerar's avatar
wafwerar 已提交
907
  pCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pCur));
C
Cary Xu 已提交
908 909 910 911 912
  if (pCur == NULL) {
    return NULL;
  }

  pCur->uid = uid;
C
Cary Xu 已提交
913
  // TODO: lock?
C
Cary Xu 已提交
914
  ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0);
C
Cary Xu 已提交
915
  if ((ret != 0) || (pCur->pCur == NULL)) {
wafwerar's avatar
wafwerar 已提交
916
    taosMemoryFree(pCur);
C
Cary Xu 已提交
917 918 919 920 921 922 923 924 925 926 927 928
    return NULL;
  }

  return pCur;
}

void metaCloseSmaCurosr(SMSmaCursor *pCur) {
  if (pCur) {
    if (pCur->pCur) {
      pCur->pCur->close(pCur->pCur);
    }

wafwerar's avatar
wafwerar 已提交
929
    taosMemoryFree(pCur);
C
Cary Xu 已提交
930 931 932
  }
}

C
Cary Xu 已提交
933 934 935 936
const char *metaSmaCursorNext(SMSmaCursor *pCur) {
  DBT skey = {0};
  DBT pkey = {0};
  DBT pval = {0};
C
Cary Xu 已提交
937 938 939 940

  // Set key
  skey.data = &(pCur->uid);
  skey.size = sizeof(pCur->uid);
C
Cary Xu 已提交
941
  // TODO: lock?
C
Cary Xu 已提交
942
  if (pCur->pCur->pget(pCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
C
Cary Xu 已提交
943
    const char *indexName = (const char *)pkey.data;
C
Cary Xu 已提交
944 945 946
    assert(indexName != NULL);
    return indexName;
  } else {
C
Cary Xu 已提交
947
    return NULL;
C
Cary Xu 已提交
948 949 950
  }
}

C
Cary Xu 已提交
951
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
C
Cary Xu 已提交
952 953
  STSmaWrapper *pSW = NULL;

wafwerar's avatar
wafwerar 已提交
954
  pSW = taosMemoryCalloc(1, sizeof(*pSW));
C
Cary Xu 已提交
955 956 957 958 959 960
  if (pSW == NULL) {
    return NULL;
  }

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (pCur == NULL) {
wafwerar's avatar
wafwerar 已提交
961
    taosMemoryFree(pSW);
C
Cary Xu 已提交
962 963 964
    return NULL;
  }

C
Cary Xu 已提交
965 966
  DBT   skey = {.data = &(pCur->uid), .size = sizeof(pCur->uid)};
  DBT   pval = {0};
C
Cary Xu 已提交
967 968 969 970 971 972
  void *pBuf = NULL;

  while (true) {
    // TODO: lock?
    if (pCur->pCur->pget(pCur->pCur, &skey, NULL, &pval, DB_NEXT) == 0) {
      ++pSW->number;
wafwerar's avatar
wafwerar 已提交
973
      STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma));
C
Cary Xu 已提交
974 975
      if (tptr == NULL) {
        metaCloseSmaCurosr(pCur);
C
Cary Xu 已提交
976
        tdDestroyTSmaWrapper(pSW);
wafwerar's avatar
wafwerar 已提交
977
        taosMemoryFreeClear(pSW);
C
Cary Xu 已提交
978 979 980 981 982 983
        return NULL;
      }
      pSW->tSma = tptr;
      pBuf = pval.data;
      if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) {
        metaCloseSmaCurosr(pCur);
C
Cary Xu 已提交
984
        tdDestroyTSmaWrapper(pSW);
wafwerar's avatar
wafwerar 已提交
985
        taosMemoryFreeClear(pSW);
C
Cary Xu 已提交
986 987 988 989 990 991 992 993
        return NULL;
      }
      continue;
    }
    break;
  }

  metaCloseSmaCurosr(pCur);
C
Cary Xu 已提交
994
  
C
Cary Xu 已提交
995 996
  return pSW;
}
C
Cary Xu 已提交
997

C
Cary Xu 已提交
998
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
C
Cary Xu 已提交
999
  SArray  *pUids = NULL;
C
Cary Xu 已提交
1000
  SMetaDB *pDB = pMeta->pDB;
C
Cary Xu 已提交
1001
  DBC     *pCur = NULL;
C
Cary Xu 已提交
1002
  DBT      pkey = {0}, pval = {0};
1003
  uint32_t mode = isDup ? DB_NEXT_DUP : DB_NEXT_NODUP;
C
Cary Xu 已提交
1004 1005 1006 1007 1008 1009 1010 1011
  int      ret;

  // TODO: lock?
  ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0);
  if (ret != 0) {
    return NULL;
  }
  // TODO: lock?
C
Cary Xu 已提交
1012

1013
  while ((ret = pCur->get(pCur, &pkey, &pval, mode)) == 0) {
C
Cary Xu 已提交
1014 1015 1016 1017 1018 1019 1020 1021
    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
      if (!pUids) {
        return NULL;
      }
    }

    taosArrayPush(pUids, pkey.data);
C
Cary Xu 已提交
1022
  }
C
Cary Xu 已提交
1023
  // TODO: lock?
C
Cary Xu 已提交
1024 1025 1026 1027 1028 1029 1030 1031

  if (pCur) {
    pCur->close(pCur);
  }

  return pUids;
}

H
Hongze Cheng 已提交
1032 1033
static void metaDBWLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
1034
  taosThreadRwlockWrlock(&(pDB->rwlock));
H
Hongze Cheng 已提交
1035 1036 1037 1038 1039
#endif
}

static void metaDBRLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
1040
  taosThreadRwlockRdlock(&(pDB->rwlock));
H
Hongze Cheng 已提交
1041 1042 1043 1044 1045
#endif
}

static void metaDBULock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
wafwerar's avatar
wafwerar 已提交
1046
  taosThreadRwlockUnlock(&(pDB->rwlock));
H
Hongze Cheng 已提交
1047 1048
#endif
}