metaBDBImpl.c 17.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define ALLOW_FORBID_FUNC
H
more  
Hongze Cheng 已提交
17 18
#include "db.h"

H
Hongze Cheng 已提交
19 20
#include "metaDef.h"

H
Hongze Cheng 已提交
21
#include "tcoding.h"
H
more  
Hongze Cheng 已提交
22
#include "thash.h"
H
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25 26 27
#define IMPL_WITH_LOCK 1
// #if IMPL_WITH_LOCK
// #endif

H
Hongze Cheng 已提交
28 29 30
typedef struct {
  tb_uid_t uid;
  int32_t  sver;
H
Hongze Cheng 已提交
31
  int32_t  padding;
H
Hongze Cheng 已提交
32 33
} SSchemaKey;

H
Hongze Cheng 已提交
34
struct SMetaDB {
H
Hongze Cheng 已提交
35 36 37
#if IMPL_WITH_LOCK
  pthread_rwlock_t rwlock;
#endif
H
more  
Hongze Cheng 已提交
38
  // DB
H
more  
Hongze Cheng 已提交
39 40
  DB *pTbDB;
  DB *pSchemaDB;
C
Cary Xu 已提交
41
  DB *pSmaDB;
C
Cary Xu 已提交
42

H
more  
Hongze Cheng 已提交
43
  // IDX
H
more  
Hongze Cheng 已提交
44 45 46 47
  DB *pNameIdx;
  DB *pStbIdx;
  DB *pNtbIdx;
  DB *pCtbIdx;
H
more  
Hongze Cheng 已提交
48
  // ENV
H
Hongze Cheng 已提交
49 50 51
  DB_ENV *pEvn;
};

H
Hongze Cheng 已提交
52 53
typedef int (*bdbIdxCbPtr)(DB *, const DBT *, const DBT *, DBT *);

H
more  
Hongze Cheng 已提交
54 55
static SMetaDB *metaNewDB();
static void     metaFreeDB(SMetaDB *pDB);
H
more  
Hongze Cheng 已提交
56 57
static int      metaOpenBDBEnv(DB_ENV **ppEnv, const char *path);
static void     metaCloseBDBEnv(DB_ENV *pEnv);
H
Hongze Cheng 已提交
58
static int      metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
H
more  
Hongze Cheng 已提交
59
static void     metaCloseBDBDb(DB *pDB);
H
Hongze Cheng 已提交
60
static int      metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup);
H
Hongze Cheng 已提交
61
static void     metaCloseBDBIdx(DB *pIdx);
H
Hongze Cheng 已提交
62 63 64 65
static int      metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int      metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
H
Hongze Cheng 已提交
66
static int      metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
L
fix  
Liu Jicong 已提交
67
static void    *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
H
Hongze Cheng 已提交
68
static void     metaClearTbCfg(STbCfg *pTbCfg);
H
Hongze Cheng 已提交
69
static int      metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
L
fix  
Liu Jicong 已提交
70
static void    *metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
H
Hongze Cheng 已提交
71 72 73
static void     metaDBWLock(SMetaDB *pDB);
static void     metaDBRLock(SMetaDB *pDB);
static void     metaDBULock(SMetaDB *pDB);
H
more  
Hongze Cheng 已提交
74

H
more  
Hongze Cheng 已提交
75
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
H
Hongze Cheng 已提交
76

H
Hongze Cheng 已提交
77
int metaOpenDB(SMeta *pMeta) {
H
more  
Hongze Cheng 已提交
78
  SMetaDB *pDB;
H
Hongze Cheng 已提交
79

H
more  
Hongze Cheng 已提交
80 81 82
  // Create DB object
  pDB = metaNewDB();
  if (pDB == NULL) {
H
more  
Hongze Cheng 已提交
83 84 85
    return -1;
  }

H
more  
Hongze Cheng 已提交
86
  pMeta->pDB = pDB;
H
more  
Hongze Cheng 已提交
87

H
more  
Hongze Cheng 已提交
88 89
  // Open DB Env
  if (metaOpenBDBEnv(&(pDB->pEvn), pMeta->path) < 0) {
H
more  
Hongze Cheng 已提交
90
    metaCloseDB(pMeta);
H
more  
Hongze Cheng 已提交
91 92
    return -1;
  }
H
Hongze Cheng 已提交
93

H
more  
Hongze Cheng 已提交
94
  // Open DBs
H
Hongze Cheng 已提交
95
  if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db", false) < 0) {
H
more  
Hongze Cheng 已提交
96 97
    metaCloseDB(pMeta);
    return -1;
H
more  
Hongze Cheng 已提交
98 99
  }

H
more  
Hongze Cheng 已提交
100
  if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "schema.db", false) < 0) {
H
more  
Hongze Cheng 已提交
101 102 103
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
104

C
Cary Xu 已提交
105 106 107 108 109
  if (metaOpenBDBDb(&(pDB->pSmaDB), pDB->pEvn, "sma.db", false) < 0) {
    metaCloseDB(pMeta);
    return -1;
  }

H
more  
Hongze Cheng 已提交
110
  // Open Indices
H
Hongze Cheng 已提交
111
  if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
112 113 114
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
115

H
Hongze Cheng 已提交
116
  if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "stb.index", pDB->pTbDB, &metaStbIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
117 118 119
    metaCloseDB(pMeta);
    return -1;
  }
H
Hongze Cheng 已提交
120

H
Hongze Cheng 已提交
121
  if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "ntb.index", pDB->pTbDB, &metaNtbIdxCb, false) < 0) {
H
more  
Hongze Cheng 已提交
122 123
    metaCloseDB(pMeta);
    return -1;
H
Hongze Cheng 已提交
124 125
  }

H
Hongze Cheng 已提交
126
  if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "ctb.index", pDB->pTbDB, &metaCtbIdxCb, true) < 0) {
H
more  
Hongze Cheng 已提交
127 128
    metaCloseDB(pMeta);
    return -1;
H
more  
Hongze Cheng 已提交
129 130
  }

H
Hongze Cheng 已提交
131 132 133 134 135
  return 0;
}

void metaCloseDB(SMeta *pMeta) {
  if (pMeta->pDB) {
H
Hongze Cheng 已提交
136 137 138 139
    metaCloseBDBIdx(pMeta->pDB->pCtbIdx);
    metaCloseBDBIdx(pMeta->pDB->pNtbIdx);
    metaCloseBDBIdx(pMeta->pDB->pStbIdx);
    metaCloseBDBIdx(pMeta->pDB->pNameIdx);
C
Cary Xu 已提交
140
    metaCloseBDBDb(pMeta->pDB->pSmaDB);
H
Hongze Cheng 已提交
141 142
    metaCloseBDBDb(pMeta->pDB->pSchemaDB);
    metaCloseBDBDb(pMeta->pDB->pTbDB);
H
more  
Hongze Cheng 已提交
143
    metaCloseBDBEnv(pMeta->pDB->pEvn);
H
more  
Hongze Cheng 已提交
144 145
    metaFreeDB(pMeta->pDB);
    pMeta->pDB = NULL;
H
Hongze Cheng 已提交
146 147 148
  }
}

H
more  
Hongze Cheng 已提交
149
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
H
more  
Hongze Cheng 已提交
150 151
  tb_uid_t uid;
  char     buf[512];
H
fix  
Hongze Cheng 已提交
152
  char     buf1[512];
L
fix  
Liu Jicong 已提交
153
  void    *pBuf;
H
Hongze Cheng 已提交
154 155
  DBT      key1, value1;
  DBT      key2, value2;
H
more  
Hongze Cheng 已提交
156
  SSchema *pSchema = NULL;
H
Hongze Cheng 已提交
157 158 159 160 161 162 163 164 165

  if (pTbCfg->type == META_SUPER_TABLE) {
    uid = pTbCfg->stbCfg.suid;
  } else {
    uid = metaGenerateUid(pMeta);
  }

  {
    // save table info
H
Hongze Cheng 已提交
166
    pBuf = buf;
H
Hongze Cheng 已提交
167 168
    memset(&key1, 0, sizeof(key1));
    memset(&value1, 0, sizeof(key1));
H
Hongze Cheng 已提交
169

H
Hongze Cheng 已提交
170 171
    key1.data = &uid;
    key1.size = sizeof(uid);
H
Hongze Cheng 已提交
172

H
Hongze Cheng 已提交
173
    metaEncodeTbInfo(&pBuf, pTbCfg);
H
Hongze Cheng 已提交
174

H
Hongze Cheng 已提交
175 176 177
    value1.data = buf;
    value1.size = POINTER_DISTANCE(pBuf, buf);
    value1.app_data = pTbCfg;
H
Hongze Cheng 已提交
178 179 180
  }

  // save schema
H
more  
Hongze Cheng 已提交
181
  uint32_t ncols;
H
Hongze Cheng 已提交
182
  if (pTbCfg->type == META_SUPER_TABLE) {
H
more  
Hongze Cheng 已提交
183
    ncols = pTbCfg->stbCfg.nCols;
H
Hongze Cheng 已提交
184 185
    pSchema = pTbCfg->stbCfg.pSchema;
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
H
more  
Hongze Cheng 已提交
186
    ncols = pTbCfg->ntbCfg.nCols;
H
Hongze Cheng 已提交
187 188 189 190
    pSchema = pTbCfg->ntbCfg.pSchema;
  }

  if (pSchema) {
H
fix  
Hongze Cheng 已提交
191
    pBuf = buf1;
H
Hongze Cheng 已提交
192 193
    memset(&key2, 0, sizeof(key2));
    memset(&value2, 0, sizeof(key2));
H
Hongze Cheng 已提交
194
    SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0};
H
Hongze Cheng 已提交
195

H
Hongze Cheng 已提交
196 197
    key2.data = &schemaKey;
    key2.size = sizeof(schemaKey);
H
Hongze Cheng 已提交
198

H
Hongze Cheng 已提交
199 200
    SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema};
    metaEncodeSchema(&pBuf, &sw);
H
Hongze Cheng 已提交
201

H
fix  
Hongze Cheng 已提交
202 203
    value2.data = buf1;
    value2.size = POINTER_DISTANCE(pBuf, buf1);
H
Hongze Cheng 已提交
204 205
  }

H
Hongze Cheng 已提交
206 207
  metaDBWLock(pMeta->pDB);
  pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key1, &value1, 0);
H
fix  
Hongze Cheng 已提交
208 209 210
  if (pSchema) {
    pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key2, &value2, 0);
  }
H
Hongze Cheng 已提交
211 212
  metaDBULock(pMeta->pDB);

H
more  
Hongze Cheng 已提交
213 214 215 216 217 218 219 220 221
  return 0;
}

int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
  // TODO
  return 0;
}

/* ------------------------ STATIC METHODS ------------------------ */
H
Hongze Cheng 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
  int      tlen = 0;
  SSchema *pSchema;

  tlen += taosEncodeFixedU32(buf, pSW->nCols);
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tlen += taosEncodeFixedI8(buf, pSchema->type);
    tlen += taosEncodeFixedI32(buf, pSchema->colId);
    tlen += taosEncodeFixedI32(buf, pSchema->bytes);
    tlen += taosEncodeString(buf, pSchema->name);
  }

  return tlen;
}

static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
  SSchema *pSchema;

  buf = taosDecodeFixedU32(buf, &pSW->nCols);
  pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * pSW->nCols);
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    buf = taosDecodeFixedI8(buf, &pSchema->type);
    buf = taosDecodeFixedI32(buf, &pSchema->colId);
    buf = taosDecodeFixedI32(buf, &pSchema->bytes);
    buf = taosDecodeStringTo(buf, pSchema->name);
  }

  return buf;
}

H
more  
Hongze Cheng 已提交
254 255 256 257 258 259 260
static SMetaDB *metaNewDB() {
  SMetaDB *pDB = NULL;
  pDB = (SMetaDB *)calloc(1, sizeof(*pDB));
  if (pDB == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
261 262 263 264
#if IMPL_WITH_LOCK
  pthread_rwlock_init(&pDB->rwlock, NULL);
#endif

H
more  
Hongze Cheng 已提交
265 266 267 268 269
  return pDB;
}

static void metaFreeDB(SMetaDB *pDB) {
  if (pDB) {
H
Hongze Cheng 已提交
270 271 272
#if IMPL_WITH_LOCK
    pthread_rwlock_destroy(&pDB->rwlock);
#endif
H
more  
Hongze Cheng 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
    free(pDB);
  }
}

static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
  int     ret;
  DB_ENV *pEnv;

  if (path == NULL) return 0;

  ret = db_env_create(&pEnv, 0);
  if (ret != 0) {
    BDB_PERR("Failed to create META env", ret);
    return -1;
  }

H
Hongze Cheng 已提交
289
  ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
H
more  
Hongze Cheng 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
  if (ret != 0) {
    BDB_PERR("Failed to open META env", ret);
    return -1;
  }

  *ppEnv = pEnv;

  return 0;
}

static void metaCloseBDBEnv(DB_ENV *pEnv) {
  if (pEnv) {
    pEnv->close(pEnv, 0);
  }
}

H
Hongze Cheng 已提交
306
static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) {
H
more  
Hongze Cheng 已提交
307 308 309
  int ret;
  DB *pDB;

H
Hongze Cheng 已提交
310
  ret = db_create(&(pDB), pEnv, 0);
H
more  
Hongze Cheng 已提交
311 312 313 314 315
  if (ret != 0) {
    BDB_PERR("Failed to create META DB", ret);
    return -1;
  }

H
Hongze Cheng 已提交
316 317 318 319 320 321 322 323
  if (isDup) {
    ret = pDB->set_flags(pDB, DB_DUPSORT);
    if (ret != 0) {
      BDB_PERR("Failed to set DB flags", ret);
      return -1;
    }
  }

H
more  
Hongze Cheng 已提交
324 325 326 327 328 329
  ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0);
  if (ret) {
    BDB_PERR("Failed to open META DB", ret);
    return -1;
  }

H
Hongze Cheng 已提交
330 331
  *ppDB = pDB;

H
more  
Hongze Cheng 已提交
332 333 334 335 336 337 338 339 340
  return 0;
}

static void metaCloseBDBDb(DB *pDB) {
  if (pDB) {
    pDB->close(pDB, 0);
  }
}

H
Hongze Cheng 已提交
341
static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup) {
H
Hongze Cheng 已提交
342 343 344
  DB *pIdx;
  int ret;

H
Hongze Cheng 已提交
345
  if (metaOpenBDBDb(ppIdx, pEnv, pFName, isDup) < 0) {
H
Hongze Cheng 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
    return -1;
  }

  pIdx = *ppIdx;
  ret = pDB->associate(pDB, NULL, pIdx, cbf, 0);
  if (ret) {
    BDB_PERR("Failed to associate META DB and Index", ret);
  }

  return 0;
}

static void metaCloseBDBIdx(DB *pIdx) {
  if (pIdx) {
    pIdx->close(pIdx, 0);
  }
}

static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
365
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
366

H
Hongze Cheng 已提交
367
  memset(pSKey, 0, sizeof(*pSKey));
H
Hongze Cheng 已提交
368

H
Hongze Cheng 已提交
369 370
  pSKey->data = pTbCfg->name;
  pSKey->size = strlen(pTbCfg->name);
H
Hongze Cheng 已提交
371 372 373 374

  return 0;
}

H
Hongze Cheng 已提交
375
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
376
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
377

H
Hongze Cheng 已提交
378
  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
379 380 381
    memset(pSKey, 0, sizeof(*pSKey));
    pSKey->data = pKey->data;
    pSKey->size = pKey->size;
H
Hongze Cheng 已提交
382

H
Hongze Cheng 已提交
383
    return 0;
H
Hongze Cheng 已提交
384
  } else {
H
Hongze Cheng 已提交
385
    return DB_DONOTINDEX;
H
Hongze Cheng 已提交
386
  }
H
Hongze Cheng 已提交
387 388
}

H
Hongze Cheng 已提交
389
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
390
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
H
Hongze Cheng 已提交
391

H
Hongze Cheng 已提交
392
  if (pTbCfg->type == META_NORMAL_TABLE) {
H
Hongze Cheng 已提交
393 394 395
    memset(pSKey, 0, sizeof(*pSKey));
    pSKey->data = pKey->data;
    pSKey->size = pKey->size;
H
Hongze Cheng 已提交
396

H
Hongze Cheng 已提交
397 398 399 400
    return 0;
  } else {
    return DB_DONOTINDEX;
  }
H
Hongze Cheng 已提交
401 402
}

H
Hongze Cheng 已提交
403
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
H
Hongze Cheng 已提交
404
  STbCfg *pTbCfg = (STbCfg *)(pValue->app_data);
L
fix  
Liu Jicong 已提交
405
  DBT    *pDbt;
H
Hongze Cheng 已提交
406

H
Hongze Cheng 已提交
407
  if (pTbCfg->type == META_CHILD_TABLE) {
H
Hongze Cheng 已提交
408
    // pDbt = calloc(2, sizeof(DBT));
H
Hongze Cheng 已提交
409

H
Hongze Cheng 已提交
410 411 412
    // // First key is suid
    // pDbt[0].data = &(pTbCfg->ctbCfg.suid);
    // pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid);
H
Hongze Cheng 已提交
413

H
Hongze Cheng 已提交
414 415 416 417
    // // Second key is the first tag
    // void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
    // pDbt[1].data = pTagVal;
    // pDbt[1].size = sizeof(int32_t);
H
Hongze Cheng 已提交
418

H
Hongze Cheng 已提交
419
    // Set index key
H
Hongze Cheng 已提交
420
    memset(pSKey, 0, sizeof(*pSKey));
H
Hongze Cheng 已提交
421
#if 0
H
Hongze Cheng 已提交
422 423 424
    pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
    pSKey->data = pDbt;
    pSKey->size = 2;
H
Hongze Cheng 已提交
425 426 427 428
#else
    pSKey->data = &(pTbCfg->ctbCfg.suid);
    pSKey->size = sizeof(pTbCfg->ctbCfg.suid);
#endif
H
Hongze Cheng 已提交
429

H
Hongze Cheng 已提交
430 431 432 433
    return 0;
  } else {
    return DB_DONOTINDEX;
  }
H
Hongze Cheng 已提交
434 435 436
}

static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
H
more  
Hongze Cheng 已提交
437 438 439 440 441
  int tsize = 0;

  tsize += taosEncodeString(buf, pTbCfg->name);
  tsize += taosEncodeFixedU32(buf, pTbCfg->ttl);
  tsize += taosEncodeFixedU32(buf, pTbCfg->keep);
H
Hongze Cheng 已提交
442
  tsize += taosEncodeFixedU8(buf, pTbCfg->type);
H
more  
Hongze Cheng 已提交
443 444

  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
445 446
    SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema};
    tsize += metaEncodeSchema(buf, &sw);
H
more  
Hongze Cheng 已提交
447 448 449 450 451 452 453 454 455
  } else if (pTbCfg->type == META_CHILD_TABLE) {
    tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
    tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
  } else {
    ASSERT(0);
  }

  return tsize;
H
Hongze Cheng 已提交
456 457 458
}

static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
H
more  
Hongze Cheng 已提交
459 460 461
  buf = taosDecodeString(buf, &(pTbCfg->name));
  buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl));
  buf = taosDecodeFixedU32(buf, &(pTbCfg->keep));
H
Hongze Cheng 已提交
462
  buf = taosDecodeFixedU8(buf, &(pTbCfg->type));
H
more  
Hongze Cheng 已提交
463 464

  if (pTbCfg->type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
465 466 467 468
    SSchemaWrapper sw;
    buf = metaDecodeSchema(buf, &sw);
    pTbCfg->stbCfg.nTagCols = sw.nCols;
    pTbCfg->stbCfg.pTagSchema = sw.pSchema;
H
more  
Hongze Cheng 已提交
469 470 471 472 473 474 475 476
  } else if (pTbCfg->type == META_CHILD_TABLE) {
    buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
    buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
  } else if (pTbCfg->type == META_NORMAL_TABLE) {
  } else {
    ASSERT(0);
  }
  return buf;
H
Hongze Cheng 已提交
477 478
}

H
Hongze Cheng 已提交
479 480 481 482 483 484 485
static void metaClearTbCfg(STbCfg *pTbCfg) {
  tfree(pTbCfg->name);
  if (pTbCfg->type == META_SUPER_TABLE) {
    tdFreeSchema(pTbCfg->stbCfg.pTagSchema);
  } else if (pTbCfg->type == META_CHILD_TABLE) {
    tfree(pTbCfg->ctbCfg.pTag);
  }
H
more  
Hongze Cheng 已提交
486 487 488
}

/* ------------------------ FOR QUERY ------------------------ */
H
Hongze Cheng 已提交
489
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
L
fix  
Liu Jicong 已提交
490
  STbCfg  *pTbCfg = NULL;
H
Hongze Cheng 已提交
491 492 493 494 495 496 497 498 499 500
  SMetaDB *pDB = pMeta->pDB;
  DBT      key = {0};
  DBT      value = {0};
  int      ret;

  // Set key/value
  key.data = &uid;
  key.size = sizeof(uid);

  // Query
H
Hongze Cheng 已提交
501
  metaDBRLock(pDB);
H
Hongze Cheng 已提交
502
  ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0);
H
Hongze Cheng 已提交
503
  metaDBULock(pDB);
H
Hongze Cheng 已提交
504 505 506 507 508 509 510 511 512
  if (ret != 0) {
    return NULL;
  }

  // Decode
  pTbCfg = (STbCfg *)malloc(sizeof(*pTbCfg));
  if (pTbCfg == NULL) {
    return NULL;
  }
H
more  
Hongze Cheng 已提交
513

H
Hongze Cheng 已提交
514 515 516 517 518
  metaDecodeTbInfo(value.data, pTbCfg);

  return pTbCfg;
}

H
Hongze Cheng 已提交
519
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
L
fix  
Liu Jicong 已提交
520
  STbCfg  *pTbCfg = NULL;
H
Hongze Cheng 已提交
521 522 523 524 525 526 527
  SMetaDB *pDB = pMeta->pDB;
  DBT      key = {0};
  DBT      pkey = {0};
  DBT      pvalue = {0};
  int      ret;

  // Set key/value
H
more  
Hongze Cheng 已提交
528
  key.data = tbname;
H
Hongze Cheng 已提交
529
  key.size = strlen(tbname);
H
more  
Hongze Cheng 已提交
530

H
Hongze Cheng 已提交
531
  // Query
H
Hongze Cheng 已提交
532
  metaDBRLock(pDB);
H
Hongze Cheng 已提交
533
  ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0);
H
Hongze Cheng 已提交
534
  metaDBULock(pDB);
H
Hongze Cheng 已提交
535
  if (ret != 0) {
H
Hongze Cheng 已提交
536
    return NULL;
H
Hongze Cheng 已提交
537 538
  }

H
Hongze Cheng 已提交
539 540 541 542 543 544 545 546 547 548
  // Decode
  *uid = *(tb_uid_t *)(pkey.data);
  pTbCfg = (STbCfg *)malloc(sizeof(*pTbCfg));
  if (pTbCfg == NULL) {
    return NULL;
  }

  metaDecodeTbInfo(pvalue.data, pTbCfg);

  return pTbCfg;
H
more  
Hongze Cheng 已提交
549 550
}

H
Hongze Cheng 已提交
551
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
H
more  
Hongze Cheng 已提交
552 553
  uint32_t        nCols;
  SSchemaWrapper *pSW = NULL;
L
fix  
Liu Jicong 已提交
554
  SMetaDB        *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
555
  int             ret;
L
fix  
Liu Jicong 已提交
556 557
  void           *pBuf;
  SSchema        *pSchema;
H
Hongze Cheng 已提交
558
  SSchemaKey      schemaKey = {uid, sver, 0};
H
more  
Hongze Cheng 已提交
559 560 561 562 563 564 565 566
  DBT             key = {0};
  DBT             value = {0};

  // Set key/value properties
  key.data = &schemaKey;
  key.size = sizeof(schemaKey);

  // Query
H
Hongze Cheng 已提交
567
  metaDBRLock(pDB);
H
more  
Hongze Cheng 已提交
568
  ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0);
H
Hongze Cheng 已提交
569
  metaDBULock(pDB);
H
more  
Hongze Cheng 已提交
570
  if (ret != 0) {
H
Hongze Cheng 已提交
571
    printf("failed to query schema DB since %s================\n", db_strerror(ret));
H
more  
Hongze Cheng 已提交
572 573 574 575 576
    return NULL;
  }

  // Decode the schema
  pBuf = value.data;
H
Hongze Cheng 已提交
577 578
  pSW = malloc(sizeof(*pSW));
  metaDecodeSchema(pBuf, pSW);
H
more  
Hongze Cheng 已提交
579 580

  return pSW;
H
more  
Hongze Cheng 已提交
581 582 583 584 585 586 587 588
}

struct SMTbCursor {
  DBC *pCur;
};

SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;
L
fix  
Liu Jicong 已提交
589
  SMetaDB    *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
590 591 592 593 594 595 596 597

  pTbCur = (SMTbCursor *)calloc(1, sizeof(*pTbCur));
  if (pTbCur == NULL) {
    return NULL;
  }

  pDB->pTbDB->cursor(pDB->pTbDB, NULL, &(pTbCur->pCur), 0);

H
more  
Hongze Cheng 已提交
598 599 600 601 602 603
#if 0
    DB_BTREE_STAT *sp;
    pDB->pTbDB->stat(pDB->pTbDB, NULL, &sp, 0);
    printf("**************** %ld\n", sp->bt_nkeys);
#endif

H
more  
Hongze Cheng 已提交
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
  return pTbCur;
}

void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
    if (pTbCur->pCur) {
      pTbCur->pCur->close(pTbCur->pCur);
    }
    free(pTbCur);
  }
}

char *metaTbCursorNext(SMTbCursor *pTbCur) {
  DBT    key = {0};
  DBT    value = {0};
  STbCfg tbCfg;
L
fix  
Liu Jicong 已提交
620
  void  *pBuf;
H
more  
Hongze Cheng 已提交
621

H
Hongze Cheng 已提交
622 623 624 625 626
  for (;;) {
    if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) {
      pBuf = value.data;
      metaDecodeTbInfo(pBuf, &tbCfg);
      if (tbCfg.type == META_SUPER_TABLE) {
H
Hongze Cheng 已提交
627
        free(tbCfg.name);
H
Hongze Cheng 已提交
628
        free(tbCfg.stbCfg.pTagSchema);
H
Hongze Cheng 已提交
629
        continue;
H
Hongze Cheng 已提交
630
      } else if (tbCfg.type == META_CHILD_TABLE) {
H
Hongze Cheng 已提交
631
        kvRowFree(tbCfg.ctbCfg.pTag);
H
Hongze Cheng 已提交
632 633 634 635 636
      }
      return tbCfg.name;
    } else {
      return NULL;
    }
H
more  
Hongze Cheng 已提交
637
  }
H
more  
Hongze Cheng 已提交
638 639 640 641
}

STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
  STSchemaBuilder sb;
L
fix  
Liu Jicong 已提交
642 643
  STSchema       *pTSchema = NULL;
  SSchema        *pSchema;
H
more  
Hongze Cheng 已提交
644
  SSchemaWrapper *pSW;
L
fix  
Liu Jicong 已提交
645
  STbCfg         *pTbCfg;
H
Hongze Cheng 已提交
646
  tb_uid_t        quid;
H
more  
Hongze Cheng 已提交
647

H
Hongze Cheng 已提交
648 649 650 651 652 653 654 655
  pTbCfg = metaGetTbInfoByUid(pMeta, uid);
  if (pTbCfg->type == META_CHILD_TABLE) {
    quid = pTbCfg->ctbCfg.suid;
  } else {
    quid = uid;
  }

  pSW = metaGetTableSchema(pMeta, quid, sver, true);
H
more  
Hongze Cheng 已提交
656 657 658 659 660 661 662 663 664 665 666 667 668 669
  if (pSW == NULL) {
    return NULL;
  }

  // Rebuild a schema
  tdInitTSchemaBuilder(&sb, 0);
  for (int32_t i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
  tdDestroyTSchemaBuilder(&sb);

  return pTSchema;
H
more  
Hongze Cheng 已提交
670 671 672
}

struct SMCtbCursor {
L
fix  
Liu Jicong 已提交
673
  DBC     *pCur;
H
more  
Hongze Cheng 已提交
674 675 676 677 678
  tb_uid_t suid;
};

SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
  SMCtbCursor *pCtbCur = NULL;
L
fix  
Liu Jicong 已提交
679
  SMetaDB     *pDB = pMeta->pDB;
H
more  
Hongze Cheng 已提交
680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706
  int          ret;

  pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur));
  if (pCtbCur == NULL) {
    return NULL;
  }

  pCtbCur->suid = uid;
  ret = pDB->pCtbIdx->cursor(pDB->pCtbIdx, NULL, &(pCtbCur->pCur), 0);
  if (ret != 0) {
    free(pCtbCur);
    return NULL;
  }

  return pCtbCur;
}

void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
  if (pCtbCur) {
    if (pCtbCur->pCur) {
      pCtbCur->pCur->close(pCtbCur->pCur);
    }

    free(pCtbCur);
  }
}

707
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
more  
Hongze Cheng 已提交
708 709 710
  DBT    skey = {0};
  DBT    pkey = {0};
  DBT    pval = {0};
L
fix  
Liu Jicong 已提交
711
  void  *pBuf;
H
more  
Hongze Cheng 已提交
712 713 714 715 716 717
  STbCfg tbCfg;

  // Set key
  skey.data = &(pCtbCur->suid);
  skey.size = sizeof(pCtbCur->suid);

718 719 720 721
  if (pCtbCur->pCur->pget(pCtbCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
    tb_uid_t id = *(tb_uid_t *)pkey.data;
    assert(id != 0);
    return id;
H
Hongze Cheng 已提交
722 723
    //    metaDecodeTbInfo(pBuf, &tbCfg);
    //    return tbCfg.;
H
more  
Hongze Cheng 已提交
724
  } else {
725
    return 0;
H
more  
Hongze Cheng 已提交
726
  }
H
Hongze Cheng 已提交
727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
}

static void metaDBWLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
  pthread_rwlock_wrlock(&(pDB->rwlock));
#endif
}

static void metaDBRLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
  pthread_rwlock_rdlock(&(pDB->rwlock));
#endif
}

static void metaDBULock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
  pthread_rwlock_unlock(&(pDB->rwlock));
#endif
}