metaQuery.c 31.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "meta.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
H
Hongze Cheng 已提交
19 20
  memset(pReader, 0, sizeof(*pReader));
  pReader->flags = flags;
H
Hongze Cheng 已提交
21
  pReader->pMeta = pMeta;
22 23 24
  if (!(flags & META_READER_NOLOCK)) {
    metaRLock(pMeta);
  }
H
Hongze Cheng 已提交
25
}
H
Hongze Cheng 已提交
26

27 28 29 30 31 32 33
void metaReaderReleaseLock(SMetaReader *pReader) {
  if (pReader->pMeta && !(pReader->flags & META_READER_NOLOCK)) {
    metaULock(pReader->pMeta);
    pReader->flags |= META_READER_NOLOCK;
  }
}

H
Hongze Cheng 已提交
34
void metaReaderClear(SMetaReader *pReader) {
35
  if (pReader->pMeta && !(pReader->flags & META_READER_NOLOCK)) {
H
Hongze Cheng 已提交
36 37
    metaULock(pReader->pMeta);
  }
H
Hongze Cheng 已提交
38
  tDecoderClear(&pReader->coder);
H
Hongze Cheng 已提交
39
  tdbFree(pReader->pBuf);
H
Hongze Cheng 已提交
40 41
}

H
Hongze Cheng 已提交
42
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
H
Hongze Cheng 已提交
43
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
44 45
  STbDbKey tbDbKey = {.version = version, .uid = uid};

H
Hongze Cheng 已提交
46
  // query table.db
H
Hongze Cheng 已提交
47
  if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
48
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
49 50 51 52
    goto _err;
  }

  // decode the entry
H
Hongze Cheng 已提交
53
  tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
H
Hongze Cheng 已提交
54 55 56 57 58 59 60 61 62 63 64

  if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
    goto _err;
  }

  return 0;

_err:
  return -1;
}

dengyihao's avatar
dengyihao 已提交
65
// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) {
wmmhello's avatar
wmmhello 已提交
66
//
dengyihao's avatar
dengyihao 已提交
67 68 69 70 71
//   SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader));
//   SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey));
//   SMeta  *pMeta = meta;
//   int64_t version;
//   SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
72
//
dengyihao's avatar
dengyihao 已提交
73 74 75 76 77 78 79 80 81 82
//   int64_t stt1 = taosGetTimestampUs();
//   for(int i = 0; i < taosArrayGetSize(uidList); i++) {
//     void* ppVal = NULL;
//     int vlen = 0;
//     uint64_t *  uid = taosArrayGet(uidList, i);
//     // query uid.idx
//     if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) {
//       continue;
//     }
//     version = *(int64_t *)ppVal;
wmmhello's avatar
wmmhello 已提交
83
//
dengyihao's avatar
dengyihao 已提交
84 85 86 87 88 89
//     STbDbKey tbDbKey = {.version = version, .uid = *uid};
//     taosArrayPush(uidVersion, &tbDbKey);
//     taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t));
//   }
//   int64_t stt2 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1);
wmmhello's avatar
wmmhello 已提交
90
//
dengyihao's avatar
dengyihao 已提交
91 92 93 94 95
//   TBC        *pCur = NULL;
//   tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
//   tdbTbcMoveToFirst(pCur);
//   void *pKey = NULL;
//   int   kLen = 0;
wmmhello's avatar
wmmhello 已提交
96
//
dengyihao's avatar
dengyihao 已提交
97 98 99 100 101 102 103 104 105 106
//   while(1){
//     SMetaReader pReader = {0};
//     int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf);
//     if (ret < 0) break;
//     STbDbKey *tmp = (STbDbKey*)pKey;
//     int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t));
//     if(ver == NULL || *ver != tmp->version) continue;
//     taosArrayPush(readerList, &pReader);
//   }
//   tdbTbcClose(pCur);
wmmhello's avatar
wmmhello 已提交
107
//
dengyihao's avatar
dengyihao 已提交
108 109 110 111 112
//   taosArrayClear(readerList);
//   int64_t stt3 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2);
//   for(int i = 0; i < taosArrayGetSize(uidVersion); i++) {
//     SMetaReader pReader = {0};
wmmhello's avatar
wmmhello 已提交
113
//
dengyihao's avatar
dengyihao 已提交
114 115 116 117 118 119 120 121 122
//     STbDbKey *tbDbKey = taosArrayGet(uidVersion, i);
//     // query table.db
//     if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) {
//       continue;
//     }
//     taosArrayPush(readerList, &pReader);
//   }
//   int64_t stt4 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3);
wmmhello's avatar
wmmhello 已提交
123
//
dengyihao's avatar
dengyihao 已提交
124 125 126 127 128
//   for(int i = 0; i < taosArrayGetSize(readerList); i++){
//     SMetaReader* pReader  = taosArrayGet(readerList, i);
//     metaReaderInit(pReader, meta, 0);
//     // decode the entry
//     tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
wmmhello's avatar
wmmhello 已提交
129
//
dengyihao's avatar
dengyihao 已提交
130 131 132 133 134 135 136 137
//     if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
//     }
//     metaReaderClear(pReader);
//   }
//   int64_t stt5 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4);
//   return 0;
// }
wmmhello's avatar
wmmhello 已提交
138

H
Hongze Cheng 已提交
139
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
140
  // query uid.idx
141 142
  metaRLock(pMeta);

143
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
144 145
    metaULock(pMeta);

146 147 148
    return false;
  }

149 150
  metaULock(pMeta);

151 152 153
  return true;
}

H
Hongze Cheng 已提交
154
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
H
Hongze Cheng 已提交
155
  SMeta  *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
156 157 158
  int64_t version;

  // query uid.idx
H
Hongze Cheng 已提交
159
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
160
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
161 162 163
    return -1;
  }

H
Hongze Cheng 已提交
164
  version = ((SUidIdxVal *)pReader->pBuf)[0].version;
H
Hongze Cheng 已提交
165
  return metaGetTableEntryByVersion(pReader, version, uid);
H
Hongze Cheng 已提交
166 167
}

H
Hongze Cheng 已提交
168
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
H
Hongze Cheng 已提交
169
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
170 171 172
  tb_uid_t uid;

  // query name.idx
H
Hongze Cheng 已提交
173
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
174
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
175 176 177 178
    return -1;
  }

  uid = *(tb_uid_t *)pReader->pBuf;
H
Hongze Cheng 已提交
179
  return metaGetTableEntryByUid(pReader, uid);
H
Hongze Cheng 已提交
180 181
}

H
Hongze Cheng 已提交
182
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
H
Hongze Cheng 已提交
183
  void    *pData = NULL;
H
Hongze Cheng 已提交
184 185 186
  int      nData = 0;
  tb_uid_t uid = 0;

wmmhello's avatar
wmmhello 已提交
187 188
  metaRLock(pMeta);

H
Hongze Cheng 已提交
189 190 191 192 193
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
    uid = *(tb_uid_t *)pData;
    tdbFree(pData);
  }

wmmhello's avatar
wmmhello 已提交
194 195
  metaULock(pMeta);

196
  return uid;
H
Hongze Cheng 已提交
197 198
}

dengyihao's avatar
dengyihao 已提交
199
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
200
  int         code = 0;
D
dapan1121 已提交
201
  SMetaReader mr = {0};
dengyihao's avatar
dengyihao 已提交
202
  metaReaderInit(&mr, (SMeta *)meta, 0);
203 204 205 206 207
  code = metaGetTableEntryByUid(&mr, uid);
  if (code < 0) {
    metaReaderClear(&mr);
    return -1;
  }
D
dapan1121 已提交
208 209 210 211 212 213

  STR_TO_VARSTR(tbName, mr.me.name);
  metaReaderClear(&mr);

  return 0;
}
dengyihao's avatar
dengyihao 已提交
214
int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid) {
dengyihao's avatar
dengyihao 已提交
215 216 217
  int         code = 0;
  SMetaReader mr = {0};
  metaReaderInit(&mr, (SMeta *)meta, 0);
dengyihao's avatar
dengyihao 已提交
218 219 220 221

  SMetaReader *pReader = &mr;

  // query name.idx
dengyihao's avatar
dengyihao 已提交
222
  if (tdbTbGet(pReader->pMeta->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
dengyihao's avatar
dengyihao 已提交
223
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
dengyihao's avatar
dengyihao 已提交
224 225 226 227
    metaReaderClear(&mr);
    return -1;
  }

dengyihao's avatar
dengyihao 已提交
228
  *uid = *(tb_uid_t *)pReader->pBuf;
dengyihao's avatar
dengyihao 已提交
229

dengyihao's avatar
dengyihao 已提交
230 231 232 233
  metaReaderClear(&mr);

  return 0;
}
D
dapan1121 已提交
234

dengyihao's avatar
dengyihao 已提交
235 236 237 238 239
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType) {
  int         code = 0;
  SMetaReader mr = {0};
  metaReaderInit(&mr, (SMeta *)meta, 0);

dengyihao's avatar
dengyihao 已提交
240 241 242
  code = metaGetTableEntryByName(&mr, tbName);
  if (code == 0) *tbType = mr.me.type;

dengyihao's avatar
dengyihao 已提交
243
  metaReaderClear(&mr);
dengyihao's avatar
dengyihao 已提交
244
  return code;
dengyihao's avatar
dengyihao 已提交
245 246
}

H
Hongze Cheng 已提交
247
int metaReadNext(SMetaReader *pReader) {
H
Hongze Cheng 已提交
248 249
  SMeta *pMeta = pReader->pMeta;

H
Hongze Cheng 已提交
250
  // TODO
H
Hongze Cheng 已提交
251

H
Hongze Cheng 已提交
252 253 254 255
  return 0;
}

#if 1  // ===================================================
H
Hongze Cheng 已提交
256 257 258 259 260 261 262 263
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;

  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
  if (pTbCur == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
264
  metaReaderInit(&pTbCur->mr, pMeta, 0);
H
Hongze Cheng 已提交
265

H
Hongze Cheng 已提交
266
  tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
H
Hongze Cheng 已提交
267

H
Hongze Cheng 已提交
268
  tdbTbcMoveToFirst(pTbCur->pDbc);
H
Hongze Cheng 已提交
269

H
Hongze Cheng 已提交
270 271 272 273 274
  return pTbCur;
}

void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
H
Hongze Cheng 已提交
275 276
    tdbFree(pTbCur->pKey);
    tdbFree(pTbCur->pVal);
H
Hongze Cheng 已提交
277
    metaReaderClear(&pTbCur->mr);
H
Hongze Cheng 已提交
278
    if (pTbCur->pDbc) {
H
Hongze Cheng 已提交
279
      tdbTbcClose(pTbCur->pDbc);
H
Hongze Cheng 已提交
280 281 282 283 284
    }
    taosMemoryFree(pTbCur);
  }
}

H
Hongze Cheng 已提交
285
int metaTbCursorNext(SMTbCursor *pTbCur) {
H
Hongze Cheng 已提交
286
  int    ret;
H
Hongze Cheng 已提交
287
  void  *pBuf;
H
Hongze Cheng 已提交
288 289 290
  STbCfg tbCfg;

  for (;;) {
H
Hongze Cheng 已提交
291
    ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
H
Hongze Cheng 已提交
292 293
    if (ret < 0) {
      return -1;
H
Hongze Cheng 已提交
294 295
    }

M
Minglei Jin 已提交
296 297
    tDecoderClear(&pTbCur->mr.coder);

H
Hongze Cheng 已提交
298
    metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
H
Hongze Cheng 已提交
299
    if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
300 301
      continue;
    }
H
Hongze Cheng 已提交
302 303

    break;
H
Hongze Cheng 已提交
304 305
  }

H
Hongze Cheng 已提交
306
  return 0;
H
Hongze Cheng 已提交
307 308
}

309
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
H
Hongze Cheng 已提交
310
  void           *pData = NULL;
H
Hongze Cheng 已提交
311 312 313 314 315
  int             nData = 0;
  int64_t         version;
  SSchemaWrapper  schema = {0};
  SSchemaWrapper *pSchema = NULL;
  SDecoder        dc = {0};
316 317 318
  if (lock) {
    metaRLock(pMeta);
  }
H
Hongze Cheng 已提交
319 320 321 322
_query:
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    goto _err;
  }
H
Hongze Cheng 已提交
323

H
Hongze Cheng 已提交
324
  version = ((SUidIdxVal *)pData)[0].version;
H
Hongze Cheng 已提交
325

H
Hongze Cheng 已提交
326 327 328 329 330 331
  tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
  SMetaEntry me = {0};
  tDecoderInit(&dc, pData, nData);
  metaDecodeEntry(&dc, &me);
  if (me.type == TSDB_SUPER_TABLE) {
    if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
H
Hongze Cheng 已提交
332
      pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
H
Hongze Cheng 已提交
333 334
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
335
    }
336 337
    {  // Traverse to find the previous qualified data
      TBC *pCur;
338 339
      tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
      STbDbKey key = {.version = sver, .uid = INT64_MAX};
340
      int      c = 0;
341
      tdbTbcMoveTo(pCur, &key, sizeof(key), &c);
342
      if (c < 0) {
343 344 345 346 347 348
        tdbTbcMoveToPrev(pCur);
      }

      void *pKey = NULL;
      void *pVal = NULL;
      int   vLen = 0, kLen = 0;
349
      while (1) {
350 351 352
        int32_t ret = tdbTbcPrev(pCur, &pKey, &kLen, &pVal, &vLen);
        if (ret < 0) break;

353 354
        STbDbKey *tmp = (STbDbKey *)pKey;
        if (tmp->uid != uid) {
355 356 357 358 359 360 361 362 363
          continue;
        }
        SDecoder   dcNew = {0};
        SMetaEntry meNew = {0};
        tDecoderInit(&dcNew, pVal, vLen);
        metaDecodeEntry(&dcNew, &meNew);
        pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow);
        tDecoderClear(&dcNew);
        tdbTbcClose(pCur);
364 365
        tdbFree(pKey);
        tdbFree(pVal);
366 367
        goto _exit;
      }
368 369
      tdbFree(pKey);
      tdbFree(pVal);
370 371
      tdbTbcClose(pCur);
    }
H
Hongze Cheng 已提交
372 373
  } else if (me.type == TSDB_CHILD_TABLE) {
    uid = me.ctbEntry.suid;
H
Hongze Cheng 已提交
374
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
375
    goto _query;
H
Hongze Cheng 已提交
376
  } else {
H
Hongze Cheng 已提交
377 378 379 380
    if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
      pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
381
    }
H
Hongze Cheng 已提交
382 383
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
384

H
Hongze Cheng 已提交
385 386 387
  // query from skm db
  if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
    goto _err;
H
Hongze Cheng 已提交
388
  }
H
Hongze Cheng 已提交
389

H
Hongze Cheng 已提交
390
  tDecoderInit(&dc, pData, nData);
M
Minglei Jin 已提交
391
  tDecodeSSchemaWrapperEx(&dc, &schema);
H
Hongze Cheng 已提交
392 393 394 395
  pSchema = tCloneSSchemaWrapper(&schema);
  tDecoderClear(&dc);

_exit:
396
  tDecoderClear(&dc);
397 398 399
  if (lock) {
    metaULock(pMeta);
  }
H
Hongze Cheng 已提交
400 401
  tdbFree(pData);
  return pSchema;
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403
_err:
404
  tDecoderClear(&dc);
405 406 407
  if (lock) {
    metaULock(pMeta);
  }
H
Hongze Cheng 已提交
408 409
  tdbFree(pData);
  return NULL;
H
Hongze Cheng 已提交
410 411
}

H
Hongze Cheng 已提交
412 413 414
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
  TBC *pCur;
  int  ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
415 416 417 418 419 420 421 422 423 424 425 426 427 428
  if (ret < 0) {
    return ret;
  }

  STtlIdxKey ttlKey = {0};
  ttlKey.dtime = ttl;
  ttlKey.uid = INT64_MAX;
  int c = 0;
  tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
  if (c < 0) {
    tdbTbcMoveToPrev(pCur);
  }

  void *pKey = NULL;
H
Hongze Cheng 已提交
429 430
  int   kLen = 0;
  while (1) {
431 432 433 434
    ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
    if (ret < 0) {
      break;
    }
H
Hongze Cheng 已提交
435
    ttlKey = *(STtlIdxKey *)pKey;
436 437
    taosArrayPush(uidList, &ttlKey.uid);
  }
wmmhello's avatar
wmmhello 已提交
438
  tdbFree(pKey);
439 440 441 442
  tdbTbcClose(pCur);
  return 0;
}

H
Hongze Cheng 已提交
443
struct SMCtbCursor {
H
Hongze Cheng 已提交
444 445
  SMeta   *pMeta;
  TBC     *pCur;
H
Hongze Cheng 已提交
446
  tb_uid_t suid;
H
Hongze Cheng 已提交
447 448
  void    *pKey;
  void    *pVal;
H
Hongze Cheng 已提交
449 450 451 452
  int      kLen;
  int      vLen;
};

453
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
H
Hongze Cheng 已提交
454
  SMCtbCursor *pCtbCur = NULL;
H
Hongze Cheng 已提交
455
  SCtbIdxKey   ctbIdxKey;
C
Cary Xu 已提交
456 457
  int          ret = 0;
  int          c = 0;
H
Hongze Cheng 已提交
458

H
Hongze Cheng 已提交
459 460 461 462
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
  if (pCtbCur == NULL) {
    return NULL;
  }
H
Hongze Cheng 已提交
463

H
fix  
Hongze Cheng 已提交
464
  pCtbCur->pMeta = pMeta;
H
Hongze Cheng 已提交
465
  pCtbCur->suid = uid;
466 467 468
  if (lock) {
    metaRLock(pMeta);
  }
H
fix  
Hongze Cheng 已提交
469

H
Hongze Cheng 已提交
470
  ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
H
Hongze Cheng 已提交
471
  if (ret < 0) {
H
fix  
Hongze Cheng 已提交
472
    metaULock(pMeta);
H
Hongze Cheng 已提交
473 474 475
    taosMemoryFree(pCtbCur);
    return NULL;
  }
H
Hongze Cheng 已提交
476

H
Hongze Cheng 已提交
477 478 479
  // move to the suid
  ctbIdxKey.suid = uid;
  ctbIdxKey.uid = INT64_MIN;
H
Hongze Cheng 已提交
480
  tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
H
Hongze Cheng 已提交
481
  if (c > 0) {
H
Hongze Cheng 已提交
482
    tdbTbcMoveToNext(pCtbCur->pCur);
H
Hongze Cheng 已提交
483 484
  }

H
Hongze Cheng 已提交
485 486 487
  return pCtbCur;
}

488
void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) {
H
Hongze Cheng 已提交
489
  if (pCtbCur) {
490
    if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta);
H
Hongze Cheng 已提交
491
    if (pCtbCur->pCur) {
H
Hongze Cheng 已提交
492
      tdbTbcClose(pCtbCur->pCur);
H
Hongze Cheng 已提交
493

H
Hongze Cheng 已提交
494 495
      tdbFree(pCtbCur->pKey);
      tdbFree(pCtbCur->pVal);
H
Hongze Cheng 已提交
496
    }
H
Hongze Cheng 已提交
497

H
Hongze Cheng 已提交
498 499
    taosMemoryFree(pCtbCur);
  }
H
Hongze Cheng 已提交
500 501 502
}

tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
503 504
  int         ret;
  SCtbIdxKey *pCtbIdxKey;
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506
  ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
H
Hongze Cheng 已提交
507 508 509
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
510

H
Hongze Cheng 已提交
511
  pCtbIdxKey = pCtbCur->pKey;
H
Hongze Cheng 已提交
512 513 514
  if (pCtbIdxKey->suid > pCtbCur->suid) {
    return 0;
  }
H
Hongze Cheng 已提交
515

H
Hongze Cheng 已提交
516
  return pCtbIdxKey->uid;
H
Hongze Cheng 已提交
517 518
}

C
Cary Xu 已提交
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
struct SMStbCursor {
  SMeta   *pMeta;
  TBC     *pCur;
  tb_uid_t suid;
  void    *pKey;
  void    *pVal;
  int      kLen;
  int      vLen;
};

SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
  SMStbCursor *pStbCur = NULL;
  int          ret = 0;
  int          c = 0;

  pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
  if (pStbCur == NULL) {
C
Cary Xu 已提交
536
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
537 538 539 540 541 542 543 544 545
    return NULL;
  }

  pStbCur->pMeta = pMeta;
  pStbCur->suid = suid;
  metaRLock(pMeta);

  ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
  if (ret < 0) {
C
Cary Xu 已提交
546
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
    metaULock(pMeta);
    taosMemoryFree(pStbCur);
    return NULL;
  }

  // move to the suid
  tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
  if (c > 0) {
    tdbTbcMoveToNext(pStbCur->pCur);
  }

  return pStbCur;
}

void metaCloseStbCursor(SMStbCursor *pStbCur) {
  if (pStbCur) {
    if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
    if (pStbCur->pCur) {
      tdbTbcClose(pStbCur->pCur);

      tdbFree(pStbCur->pKey);
      tdbFree(pStbCur->pVal);
    }

    taosMemoryFree(pStbCur);
  }
}

tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
  int ret;

  ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
582
  return *(tb_uid_t *)pStbCur->pKey;
C
Cary Xu 已提交
583 584
}

585
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
H
Hongze Cheng 已提交
586
  // SMetaReader     mr = {0};
H
Hongze Cheng 已提交
587
  STSchema       *pTSchema = NULL;
H
Hongze Cheng 已提交
588 589
  SSchemaWrapper *pSW = NULL;
  STSchemaBuilder sb = {0};
H
Hongze Cheng 已提交
590
  SSchema        *pSchema;
H
Hongze Cheng 已提交
591

592
  pSW = metaGetTableSchema(pMeta, uid, sver, lock);
C
Cary Xu 已提交
593
  if (!pSW) return NULL;
H
fix  
Hongze Cheng 已提交
594

595
  tdInitTSchemaBuilder(&sb, pSW->version);
H
Hongze Cheng 已提交
596 597 598 599 600
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
C
Cary Xu 已提交
601

H
Hongze Cheng 已提交
602 603
  tdDestroyTSchemaBuilder(&sb);

H
Hongze Cheng 已提交
604 605
  taosMemoryFree(pSW->pSchema);
  taosMemoryFree(pSW);
H
Hongze Cheng 已提交
606 607 608
  return pTSchema;
}

H
Hongze Cheng 已提交
609
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
H
Hongze Cheng 已提交
610 611
  int32_t code = 0;

H
Hongze Cheng 已提交
612 613
  void     *pData = NULL;
  int       nData = 0;
H
Hongze Cheng 已提交
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
  SSkmDbKey skmDbKey;
  if (sver <= 0) {
    SMetaInfo info;
    if (metaGetInfo(pMeta, suid ? suid : uid, &info) == 0) {
      sver = info.skmVer;
    } else {
      TBC *pSkmDbC = NULL;
      int  c;

      skmDbKey.uid = suid ? suid : uid;
      skmDbKey.sver = INT32_MAX;

      tdbTbcOpen(pMeta->pSkmDb, &pSkmDbC, NULL);
      metaRLock(pMeta);

      if (tdbTbcMoveTo(pSkmDbC, &skmDbKey, sizeof(skmDbKey), &c) < 0) {
        metaULock(pMeta);
        tdbTbcClose(pSkmDbC);
        code = TSDB_CODE_NOT_FOUND;
        goto _exit;
      }

      ASSERT(c);

      if (c < 0) {
        tdbTbcMoveToPrev(pSkmDbC);
      }

      const void *pKey = NULL;
      int32_t     nKey = 0;
      tdbTbcGet(pSkmDbC, &pKey, &nKey, NULL, NULL);

H
Hongze Cheng 已提交
646
      if (((SSkmDbKey *)pKey)->uid != skmDbKey.uid) {
H
Hongze Cheng 已提交
647 648 649 650 651 652 653 654 655 656 657 658
        metaULock(pMeta);
        tdbTbcClose(pSkmDbC);
        code = TSDB_CODE_NOT_FOUND;
        goto _exit;
      }

      sver = ((SSkmDbKey *)pKey)->sver;

      metaULock(pMeta);
      tdbTbcClose(pSkmDbC);
    }
  }
H
Hongze Cheng 已提交
659

H
Hongze Cheng 已提交
660 661 662 663
  ASSERT(sver > 0);

  skmDbKey.uid = suid ? suid : uid;
  skmDbKey.sver = sver;
H
Hongze Cheng 已提交
664
  metaRLock(pMeta);
H
Hongze Cheng 已提交
665
  if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(SSkmDbKey), &pData, &nData) < 0) {
H
Hongze Cheng 已提交
666
    metaULock(pMeta);
H
Hongze Cheng 已提交
667 668
    code = TSDB_CODE_NOT_FOUND;
    goto _exit;
H
Hongze Cheng 已提交
669 670 671 672 673 674 675 676 677 678 679
  }
  metaULock(pMeta);

  // decode
  SDecoder        dc = {0};
  SSchemaWrapper  schema;
  SSchemaWrapper *pSchemaWrapper = &schema;

  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
  tDecoderClear(&dc);
680
  tdbFree(pData);
H
Hongze Cheng 已提交
681 682 683 684 685 686 687 688 689

  // convert
  STSchemaBuilder sb = {0};

  tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
  for (int i = 0; i < pSchemaWrapper->nCols; i++) {
    SSchema *pSchema = pSchemaWrapper->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
H
Hongze Cheng 已提交
690
  STSchema *pTSchema = tdGetSchemaFromBuilder(&sb);
H
Hongze Cheng 已提交
691 692 693 694 695
  tdDestroyTSchemaBuilder(&sb);

  *ppTSchema = pTSchema;
  taosMemoryFree(pSchemaWrapper->pSchema);

H
Hongze Cheng 已提交
696
_exit:
H
Hongze Cheng 已提交
697 698 699
  return code;
}

700 701
// N.B. Called by statusReq per second
int64_t metaGetTbNum(SMeta *pMeta) {
702 703 704 705 706
  // num of child tables (excluding normal tables , stables and others)

  /* int64_t num = 0; */
  /* vnodeGetAllCtbNum(pMeta->pVnode, &num); */

707
  return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
708 709 710
}

// N.B. Called by statusReq per second
M
Minglei Jin 已提交
711
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
712
  // sum of (number of columns of stable -  1) * number of ctables (excluding timestamp column)
713 714
  if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
      ++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
715 716 717 718
    int64_t num = 0;
    vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
    pMeta->pVnode->config.vndStats.numOfTimeSeries = num;

719
    pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2;
720
  }
721

722
  return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
C
Cary Xu 已提交
723
}
H
Hongze Cheng 已提交
724

C
Cary Xu 已提交
725
typedef struct {
H
Hongze Cheng 已提交
726 727
  SMeta   *pMeta;
  TBC     *pCur;
C
Cary Xu 已提交
728
  tb_uid_t uid;
H
Hongze Cheng 已提交
729 730
  void    *pKey;
  void    *pVal;
C
Cary Xu 已提交
731 732 733 734 735 736 737 738 739 740 741 742 743
  int      kLen;
  int      vLen;
} SMSmaCursor;

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pSmaCur = NULL;
  SSmaIdxKey   smaIdxKey;
  int          ret;
  int          c;

  pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
  if (pSmaCur == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
744 745 746
    return NULL;
  }

C
Cary Xu 已提交
747 748 749 750
  pSmaCur->pMeta = pMeta;
  pSmaCur->uid = uid;
  metaRLock(pMeta);

H
Hongze Cheng 已提交
751
  ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
C
Cary Xu 已提交
752 753 754
  if (ret < 0) {
    metaULock(pMeta);
    taosMemoryFree(pSmaCur);
H
Hongze Cheng 已提交
755 756 757
    return NULL;
  }

C
Cary Xu 已提交
758 759 760
  // move to the suid
  smaIdxKey.uid = uid;
  smaIdxKey.smaUid = INT64_MIN;
H
Hongze Cheng 已提交
761
  tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
C
Cary Xu 已提交
762
  if (c > 0) {
H
Hongze Cheng 已提交
763
    tdbTbcMoveToNext(pSmaCur->pCur);
C
Cary Xu 已提交
764
  }
H
Hongze Cheng 已提交
765

C
Cary Xu 已提交
766 767
  return pSmaCur;
}
H
Hongze Cheng 已提交
768

C
Cary Xu 已提交
769 770 771 772
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
  if (pSmaCur) {
    if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
    if (pSmaCur->pCur) {
H
Hongze Cheng 已提交
773
      tdbTbcClose(pSmaCur->pCur);
H
Hongze Cheng 已提交
774

C
Cary Xu 已提交
775 776 777
      tdbFree(pSmaCur->pKey);
      tdbFree(pSmaCur->pVal);
    }
H
Hongze Cheng 已提交
778

C
Cary Xu 已提交
779 780 781 782 783 784 785 786
    taosMemoryFree(pSmaCur);
  }
}

tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
  int         ret;
  SSmaIdxKey *pSmaIdxKey;

H
Hongze Cheng 已提交
787
  ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
C
Cary Xu 已提交
788 789 790 791 792 793 794 795 796 797 798 799 800 801
  if (ret < 0) {
    return 0;
  }

  pSmaIdxKey = pSmaCur->pKey;
  if (pSmaIdxKey->uid > pSmaCur->uid) {
    return 0;
  }

  return pSmaIdxKey->uid;
}

STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
  STSmaWrapper *pSW = NULL;
H
Hongze Cheng 已提交
802
  SArray       *pSmaIds = NULL;
C
Cary Xu 已提交
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825

  if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
    return NULL;
  }

  pSW = taosMemoryCalloc(1, sizeof(*pSW));
  if (!pSW) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pSW->number = taosArrayGetSize(pSmaIds);
  pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));

  if (!pSW->tSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  int64_t smaId;
  int     smaIdx = 0;
H
Hongze Cheng 已提交
826
  STSma  *pTSma = NULL;
C
Cary Xu 已提交
827 828 829
  for (int i = 0; i < pSW->number; ++i) {
    smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
    if (metaGetTableEntryByUid(&mr, smaId) < 0) {
830
      tDecoderClear(&mr.coder);
S
Shengliang Guan 已提交
831
      metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
C
Cary Xu 已提交
832 833
      continue;
    }
834
    tDecoderClear(&mr.coder);
C
Cary Xu 已提交
835 836 837 838 839 840 841 842
    pTSma = pSW->tSma + smaIdx;
    memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
    if (deepCopy) {
      if (pTSma->exprLen > 0) {
        if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
843
        memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
H
Hongze Cheng 已提交
844
      }
C
Cary Xu 已提交
845 846 847 848 849
      if (pTSma->tagsFilterLen > 0) {
        if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
850
      }
H
Hongze Cheng 已提交
851
      memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
C
Cary Xu 已提交
852 853 854 855 856
    } else {
      pTSma->exprLen = 0;
      pTSma->expr = NULL;
      pTSma->tagsFilterLen = 0;
      pTSma->tagsFilter = NULL;
H
Hongze Cheng 已提交
857
    }
H
Hongze Cheng 已提交
858

C
Cary Xu 已提交
859
    ++smaIdx;
H
Hongze Cheng 已提交
860 861
  }

C
Cary Xu 已提交
862 863
  if (smaIdx <= 0) goto _err;
  pSW->number = smaIdx;
H
Hongze Cheng 已提交
864

C
Cary Xu 已提交
865 866
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
H
Hongze Cheng 已提交
867
  return pSW;
C
Cary Xu 已提交
868 869 870
_err:
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
C
Cary Xu 已提交
871
  tFreeTSmaWrapper(pSW, deepCopy);
H
Hongze Cheng 已提交
872 873 874
  return NULL;
}

C
Cary Xu 已提交
875
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
H
Hongze Cheng 已提交
876
  STSma      *pTSma = NULL;
C
Cary Xu 已提交
877 878 879
  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
S
Shengliang Guan 已提交
880
    metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
C
Cary Xu 已提交
881 882 883 884 885 886 887 888 889 890 891 892 893 894
    metaReaderClear(&mr);
    return NULL;
  }
  pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
  if (!pTSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    metaReaderClear(&mr);
    return NULL;
  }

  memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));

  metaReaderClear(&mr);
  return pTSma;
H
Hongze Cheng 已提交
895 896
}

C
Cary Xu 已提交
897
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
H
Hongze Cheng 已提交
898
  SArray     *pUids = NULL;
C
Cary Xu 已提交
899
  SSmaIdxKey *pSmaIdxKey = NULL;
H
Hongze Cheng 已提交
900

C
Cary Xu 已提交
901 902
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (!pCur) {
H
Hongze Cheng 已提交
903 904 905
    return NULL;
  }

C
Cary Xu 已提交
906 907 908 909 910
  while (1) {
    tb_uid_t id = metaSmaCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Hongze Cheng 已提交
911

C
Cary Xu 已提交
912 913
    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
H
Hongze Cheng 已提交
914
      if (!pUids) {
C
Cary Xu 已提交
915 916 917
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
H
Hongze Cheng 已提交
918
      }
C
Cary Xu 已提交
919
    }
H
Hongze Cheng 已提交
920

C
Cary Xu 已提交
921
    pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
H
Hongze Cheng 已提交
922

C
Cary Xu 已提交
923
    if (!taosArrayPush(pUids, &pSmaIdxKey->smaUid)) {
C
Cary Xu 已提交
924 925 926 927
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
H
Hongze Cheng 已提交
928 929 930 931 932 933 934
    }
  }

  metaCloseSmaCursor(pCur);
  return pUids;
}

C
Cary Xu 已提交
935
SArray *metaGetSmaTbUids(SMeta *pMeta) {
H
Hongze Cheng 已提交
936
  SArray     *pUids = NULL;
C
Cary Xu 已提交
937 938 939 940 941
  SSmaIdxKey *pSmaIdxKey = NULL;
  tb_uid_t    lastUid = 0;

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
  if (!pCur) {
H
Hongze Cheng 已提交
942 943 944
    return NULL;
  }

C
Cary Xu 已提交
945 946 947 948 949
  while (1) {
    tb_uid_t uid = metaSmaCursorNext(pCur);
    if (uid == 0) {
      break;
    }
H
Hongze Cheng 已提交
950

C
Cary Xu 已提交
951 952 953
    if (lastUid == uid) {
      continue;
    }
H
Hongze Cheng 已提交
954

C
Cary Xu 已提交
955 956 957 958 959 960 961 962 963 964 965
    lastUid = uid;

    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
      if (!pUids) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
      }
    }

C
Cary Xu 已提交
966
    if (!taosArrayPush(pUids, &uid)) {
C
Cary Xu 已提交
967 968 969 970 971
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
    }
H
Hongze Cheng 已提交
972 973
  }

C
Cary Xu 已提交
974 975
  metaCloseSmaCursor(pCur);
  return pUids;
H
Hongze Cheng 已提交
976 977
}

L
Liu Jicong 已提交
978
#endif
H
Hongze Cheng 已提交
979

980
const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
dengyihao's avatar
dengyihao 已提交
981
  STag *tag = (STag *)pTag;
dengyihao's avatar
dengyihao 已提交
982
  if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
983 984
    return tag;
  }
wmmhello's avatar
wmmhello 已提交
985 986
  bool find = tTagGet(tag, val);

dengyihao's avatar
dengyihao 已提交
987
  if (!find) {
wmmhello's avatar
wmmhello 已提交
988 989
    return NULL;
  }
wmmhello's avatar
wmmhello 已提交
990

wmmhello's avatar
wmmhello 已提交
991
#ifdef TAG_FILTER_DEBUG
wmmhello's avatar
wmmhello 已提交
992
  if (IS_VAR_DATA_TYPE(val->type)) {
993
    char *buf = taosMemoryCalloc(val->nData + 1, 1);
wmmhello's avatar
wmmhello 已提交
994 995 996 997 998 999 1000 1001 1002
    memcpy(buf, val->pData, val->nData);
    metaDebug("metaTag table val varchar index:%d cid:%d type:%d value:%s", 1, val->cid, val->type, buf);
    taosMemoryFree(buf);
  } else {
    double dval = 0;
    GET_TYPED_DATA(dval, double, val->type, &val->i64);
    metaDebug("metaTag table val number index:%d cid:%d type:%d value:%f", 1, val->cid, val->type, dval);
  }

1003 1004
  SArray *pTagVals = NULL;
  tTagToValArray((STag *)pTag, &pTagVals);
wmmhello's avatar
wmmhello 已提交
1005
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
1006
    STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
wmmhello's avatar
wmmhello 已提交
1007 1008

    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
1009
      char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
wmmhello's avatar
wmmhello 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018
      memcpy(buf, pTagVal->pData, pTagVal->nData);
      metaDebug("metaTag table varchar index:%d cid:%d type:%d value:%s", i, pTagVal->cid, pTagVal->type, buf);
      taosMemoryFree(buf);
    } else {
      double dval = 0;
      GET_TYPED_DATA(dval, double, pTagVal->type, &pTagVal->i64);
      metaDebug("metaTag table number index:%d cid:%d type:%d value:%f", i, pTagVal->cid, pTagVal->type, dval);
    }
  }
wmmhello's avatar
wmmhello 已提交
1019
#endif
wmmhello's avatar
wmmhello 已提交
1020

wmmhello's avatar
wmmhello 已提交
1021
  return val;
dengyihao's avatar
dengyihao 已提交
1022
}
wmmhello's avatar
wmmhello 已提交
1023 1024

typedef struct {
H
Hongze Cheng 已提交
1025 1026
  SMeta   *pMeta;
  TBC     *pCur;
wmmhello's avatar
wmmhello 已提交
1027 1028 1029
  tb_uid_t suid;
  int16_t  cid;
  int16_t  type;
H
Hongze Cheng 已提交
1030 1031
  void    *pKey;
  void    *pVal;
wmmhello's avatar
wmmhello 已提交
1032 1033 1034 1035
  int32_t  kLen;
  int32_t  vLen;
} SIdxCursor;

dengyihao's avatar
dengyihao 已提交
1036 1037 1038
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
  int32_t ret = 0;
  char   *buf = NULL;
wmmhello's avatar
wmmhello 已提交
1039

dengyihao's avatar
dengyihao 已提交
1040 1041 1042 1043
  STagIdxKey *pKey = NULL;
  int32_t     nKey = 0;

  SIdxCursor *pCursor = NULL;
wmmhello's avatar
wmmhello 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
  pCursor->pMeta = pMeta;
  pCursor->suid = param->suid;
  pCursor->cid = param->cid;
  pCursor->type = param->type;

  metaRLock(pMeta);
  ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
  if (ret < 0) {
    goto END;
  }

dengyihao's avatar
dengyihao 已提交
1056
  int32_t maxSize = 0;
wmmhello's avatar
wmmhello 已提交
1057
  int32_t nTagData = 0;
H
Hongze Cheng 已提交
1058
  void   *tagData = NULL;
dengyihao's avatar
dengyihao 已提交
1059

dengyihao's avatar
dengyihao 已提交
1060
  if (param->val == NULL) {
S
Shengliang Guan 已提交
1061
    metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
dengyihao's avatar
dengyihao 已提交
1062
    return -1;
dengyihao's avatar
dengyihao 已提交
1063
  } else {
dengyihao's avatar
dengyihao 已提交
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
    if (IS_VAR_DATA_TYPE(param->type)) {
      tagData = varDataVal(param->val);
      nTagData = varDataLen(param->val);

      if (param->type == TSDB_DATA_TYPE_NCHAR) {
        maxSize = 4 * nTagData + 1;
        buf = taosMemoryCalloc(1, maxSize);
        if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
          goto END;
        }
dengyihao's avatar
dengyihao 已提交
1074

dengyihao's avatar
dengyihao 已提交
1075 1076 1077 1078 1079 1080
        tagData = buf;
        nTagData = maxSize;
      }
    } else {
      tagData = param->val;
      nTagData = tDataTypes[param->type].bytes;
dengyihao's avatar
dengyihao 已提交
1081
    }
wmmhello's avatar
wmmhello 已提交
1082
  }
dengyihao's avatar
dengyihao 已提交
1083
  ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
wmmhello's avatar
wmmhello 已提交
1084
                            param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
dengyihao's avatar
dengyihao 已提交
1085

wmmhello's avatar
wmmhello 已提交
1086 1087 1088 1089 1090 1091 1092
  if (ret != 0) {
    goto END;
  }
  int cmp = 0;
  if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
1093

dengyihao's avatar
dengyihao 已提交
1094
  bool    first = true;
dengyihao's avatar
dengyihao 已提交
1095
  int32_t valid = 0;
wmmhello's avatar
wmmhello 已提交
1096
  while (1) {
dengyihao's avatar
dengyihao 已提交
1097 1098 1099
    void   *entryKey = NULL, *entryVal = NULL;
    int32_t nEntryKey, nEntryVal;

wmmhello's avatar
wmmhello 已提交
1100 1101 1102 1103 1104
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
    if (valid < 0) {
      break;
    }
    STagIdxKey *p = entryKey;
dengyihao's avatar
dengyihao 已提交
1105 1106 1107 1108 1109 1110 1111 1112 1113
    if (p->type != pCursor->type) {
      if (first) {
        valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
        if (valid < 0) break;
        continue;
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
1114 1115 1116
    if (p->suid != pKey->suid) {
      break;
    }
dengyihao's avatar
dengyihao 已提交
1117
    first = false;
wmmhello's avatar
wmmhello 已提交
1118 1119 1120 1121
    if (p != NULL) {
      int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
      if (cmp == 0) {
        // match
dengyihao's avatar
dengyihao 已提交
1122 1123 1124 1125 1126 1127
        tb_uid_t tuid = 0;
        if (IS_VAR_DATA_TYPE(pKey->type)) {
          tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
        } else {
          tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
        }
wmmhello's avatar
wmmhello 已提交
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
        taosArrayPush(pUids, &tuid);
      } else if (cmp == 1) {
        // not match but should continue to iter
      } else {
        // not match and no more result
        break;
      }
    }
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
    if (valid < 0) {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
1141

wmmhello's avatar
wmmhello 已提交
1142 1143 1144
END:
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
dengyihao's avatar
dengyihao 已提交
1145
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
1146
  taosMemoryFree(pKey);
wmmhello's avatar
wmmhello 已提交
1147 1148 1149 1150

  taosMemoryFree(pCursor);

  return ret;
dengyihao's avatar
dengyihao 已提交
1151
}
H
Hongze Cheng 已提交
1152

dengyihao's avatar
dengyihao 已提交
1153
static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, void **tag, int32_t *len, bool lock) {
dengyihao's avatar
dengyihao 已提交
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
  int ret = 0;
  if (lock) {
    metaRLock(pMeta);
  }

  SCtbIdxKey ctbIdxKey = {.suid = suid, .uid = uid};
  ret = tdbTbGet(pMeta->pCtbIdx, &ctbIdxKey, sizeof(SCtbIdxKey), tag, len);
  if (lock) {
    metaULock(pMeta);
  }

  return ret;
}
dengyihao's avatar
dengyihao 已提交
1167
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags) {
dengyihao's avatar
dengyihao 已提交
1168 1169 1170
  const int32_t LIMIT = 128;

  int32_t isLock = false;
dengyihao's avatar
dengyihao 已提交
1171 1172
  int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
  for (int i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187
    tb_uid_t *id = taosArrayGet(uidList, i);

    if (i % LIMIT == 0) {
      if (isLock) metaULock(pMeta);

      metaRLock(pMeta);
      isLock = true;
    }

    if (taosHashGet(tags, id, sizeof(tb_uid_t)) == NULL) {
      void   *val = NULL;
      int32_t len = 0;
      if (metaGetTableTagByUid(pMeta, suid, *id, &val, &len, false) == 0) {
        taosHashPut(tags, id, sizeof(tb_uid_t), val, len);
        tdbFree(val);
dengyihao's avatar
dengyihao 已提交
1188 1189 1190
      } else {
        metaError("vgId:%d, failed to table IDs, suid: %" PRId64 ", uid: %" PRId64 "", TD_VID(pMeta->pVnode), suid,
                  *id);
dengyihao's avatar
dengyihao 已提交
1191
      }
dengyihao's avatar
dengyihao 已提交
1192 1193
    }
  }
dengyihao's avatar
dengyihao 已提交
1194 1195
  if (isLock) metaULock(pMeta);

dengyihao's avatar
dengyihao 已提交
1196 1197
  return 0;
}
dengyihao's avatar
dengyihao 已提交
1198

1199
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
1200
  SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid, 1);
1201

1202
  SHashObj *uHash = NULL;
dengyihao's avatar
dengyihao 已提交
1203 1204
  size_t    len = taosArrayGetSize(uidList);  // len > 0 means there already have uids
  if (len > 0) {
1205
    uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1206
    for (int i = 0; i < len; i++) {
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
      int64_t *uid = taosArrayGet(uidList, i);
      taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
    }
  }
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }

wmmhello's avatar
wmmhello 已提交
1217
    if (len > 0 && taosHashGet(uHash, &id, sizeof(int64_t)) == NULL) {
1218
      continue;
dengyihao's avatar
dengyihao 已提交
1219
    } else if (len == 0) {
1220 1221 1222
      taosArrayPush(uidList, &id);
    }

1223
    taosHashPut(tags, &id, sizeof(int64_t), pCur->pVal, pCur->vLen);
1224 1225
  }

1226
  taosHashCleanup(uHash);
1227
  metaCloseCtbCursor(pCur, 1);
1228 1229
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1230

H
Hongze Cheng 已提交
1231 1232
int32_t metaCacheGet(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo);

H
Hongze Cheng 已提交
1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
  int32_t code = 0;
  void   *pData = NULL;
  int     nData = 0;

  metaRLock(pMeta);

  // search cache
  if (metaCacheGet(pMeta, uid, pInfo) == 0) {
    metaULock(pMeta);
    goto _exit;
  }

  // search TDB
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    // not found
    metaULock(pMeta);
    code = TSDB_CODE_NOT_FOUND;
    goto _exit;
  }

  metaULock(pMeta);

  pInfo->uid = uid;
  pInfo->suid = ((SUidIdxVal *)pData)->suid;
  pInfo->version = ((SUidIdxVal *)pData)->version;
  pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;

  // upsert the cache
  metaWLock(pMeta);
  metaCacheUpsert(pMeta, pInfo);
  metaULock(pMeta);

_exit:
  tdbFree(pData);
  return code;
}
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298

int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) {
  int32_t code = 0;

  metaRLock(pMeta);

  // fast path: search cache
  if (metaStatsCacheGet(pMeta, uid, pInfo) == TSDB_CODE_SUCCESS) {
    metaULock(pMeta);
    goto _exit;
  }

  // slow path: search TDB
  int64_t ctbNum = 0;
  vnodeGetCtbNum(pMeta->pVnode, uid, &ctbNum);

  metaULock(pMeta);

  pInfo->uid = uid;
  pInfo->ctbNum = ctbNum;

  // upsert the cache
  metaWLock(pMeta);
  metaStatsCacheUpsert(pMeta, pInfo);
  metaULock(pMeta);

_exit:
  return code;
}