metaQuery.c 31.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "meta.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
H
Hongze Cheng 已提交
19 20
  memset(pReader, 0, sizeof(*pReader));
  pReader->flags = flags;
H
Hongze Cheng 已提交
21
  pReader->pMeta = pMeta;
22 23 24
  if (!(flags & META_READER_NOLOCK)) {
    metaRLock(pMeta);
  }
H
Hongze Cheng 已提交
25
}
H
Hongze Cheng 已提交
26

27 28 29 30 31 32 33
void metaReaderReleaseLock(SMetaReader *pReader) {
  if (pReader->pMeta && !(pReader->flags & META_READER_NOLOCK)) {
    metaULock(pReader->pMeta);
    pReader->flags |= META_READER_NOLOCK;
  }
}

H
Hongze Cheng 已提交
34
void metaReaderClear(SMetaReader *pReader) {
35
  if (pReader->pMeta && !(pReader->flags & META_READER_NOLOCK)) {
H
Hongze Cheng 已提交
36 37
    metaULock(pReader->pMeta);
  }
H
Hongze Cheng 已提交
38
  tDecoderClear(&pReader->coder);
H
Hongze Cheng 已提交
39
  tdbFree(pReader->pBuf);
H
Hongze Cheng 已提交
40 41
}

H
Hongze Cheng 已提交
42
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
H
Hongze Cheng 已提交
43
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
44 45
  STbDbKey tbDbKey = {.version = version, .uid = uid};

H
Hongze Cheng 已提交
46
  // query table.db
H
Hongze Cheng 已提交
47
  if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
48
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
49 50 51 52
    goto _err;
  }

  // decode the entry
H
Hongze Cheng 已提交
53
  tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
H
Hongze Cheng 已提交
54 55 56 57 58 59 60 61 62 63 64

  if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
    goto _err;
  }

  return 0;

_err:
  return -1;
}

dengyihao's avatar
dengyihao 已提交
65
// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) {
wmmhello's avatar
wmmhello 已提交
66
//
dengyihao's avatar
dengyihao 已提交
67 68 69 70 71
//   SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader));
//   SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey));
//   SMeta  *pMeta = meta;
//   int64_t version;
//   SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
72
//
dengyihao's avatar
dengyihao 已提交
73 74 75 76 77 78 79 80 81 82
//   int64_t stt1 = taosGetTimestampUs();
//   for(int i = 0; i < taosArrayGetSize(uidList); i++) {
//     void* ppVal = NULL;
//     int vlen = 0;
//     uint64_t *  uid = taosArrayGet(uidList, i);
//     // query uid.idx
//     if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) {
//       continue;
//     }
//     version = *(int64_t *)ppVal;
wmmhello's avatar
wmmhello 已提交
83
//
dengyihao's avatar
dengyihao 已提交
84 85 86 87 88 89
//     STbDbKey tbDbKey = {.version = version, .uid = *uid};
//     taosArrayPush(uidVersion, &tbDbKey);
//     taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t));
//   }
//   int64_t stt2 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1);
wmmhello's avatar
wmmhello 已提交
90
//
dengyihao's avatar
dengyihao 已提交
91 92 93 94 95
//   TBC        *pCur = NULL;
//   tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
//   tdbTbcMoveToFirst(pCur);
//   void *pKey = NULL;
//   int   kLen = 0;
wmmhello's avatar
wmmhello 已提交
96
//
dengyihao's avatar
dengyihao 已提交
97 98 99 100 101 102 103 104 105 106
//   while(1){
//     SMetaReader pReader = {0};
//     int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf);
//     if (ret < 0) break;
//     STbDbKey *tmp = (STbDbKey*)pKey;
//     int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t));
//     if(ver == NULL || *ver != tmp->version) continue;
//     taosArrayPush(readerList, &pReader);
//   }
//   tdbTbcClose(pCur);
wmmhello's avatar
wmmhello 已提交
107
//
dengyihao's avatar
dengyihao 已提交
108 109 110 111 112
//   taosArrayClear(readerList);
//   int64_t stt3 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2);
//   for(int i = 0; i < taosArrayGetSize(uidVersion); i++) {
//     SMetaReader pReader = {0};
wmmhello's avatar
wmmhello 已提交
113
//
dengyihao's avatar
dengyihao 已提交
114 115 116 117 118 119 120 121 122
//     STbDbKey *tbDbKey = taosArrayGet(uidVersion, i);
//     // query table.db
//     if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) {
//       continue;
//     }
//     taosArrayPush(readerList, &pReader);
//   }
//   int64_t stt4 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3);
wmmhello's avatar
wmmhello 已提交
123
//
dengyihao's avatar
dengyihao 已提交
124 125 126 127 128
//   for(int i = 0; i < taosArrayGetSize(readerList); i++){
//     SMetaReader* pReader  = taosArrayGet(readerList, i);
//     metaReaderInit(pReader, meta, 0);
//     // decode the entry
//     tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
wmmhello's avatar
wmmhello 已提交
129
//
dengyihao's avatar
dengyihao 已提交
130 131 132 133 134 135 136 137
//     if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
//     }
//     metaReaderClear(pReader);
//   }
//   int64_t stt5 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4);
//   return 0;
// }
wmmhello's avatar
wmmhello 已提交
138

H
Hongze Cheng 已提交
139
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
140
  // query uid.idx
141 142
  metaRLock(pMeta);

143
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
144 145
    metaULock(pMeta);

146 147 148
    return false;
  }

149 150
  metaULock(pMeta);

151 152 153
  return true;
}

H
Hongze Cheng 已提交
154
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
H
Hongze Cheng 已提交
155
  SMeta  *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
156 157 158
  int64_t version;

  // query uid.idx
H
Hongze Cheng 已提交
159
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
160
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
161 162 163
    return -1;
  }

H
Hongze Cheng 已提交
164
  version = ((SUidIdxVal *)pReader->pBuf)[0].version;
H
Hongze Cheng 已提交
165
  return metaGetTableEntryByVersion(pReader, version, uid);
H
Hongze Cheng 已提交
166 167
}

H
Hongze Cheng 已提交
168
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
H
Hongze Cheng 已提交
169
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
170 171 172
  tb_uid_t uid;

  // query name.idx
H
Hongze Cheng 已提交
173
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
174
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
175 176 177 178
    return -1;
  }

  uid = *(tb_uid_t *)pReader->pBuf;
H
Hongze Cheng 已提交
179
  return metaGetTableEntryByUid(pReader, uid);
H
Hongze Cheng 已提交
180 181
}

H
Hongze Cheng 已提交
182
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
H
Hongze Cheng 已提交
183
  void    *pData = NULL;
H
Hongze Cheng 已提交
184 185 186
  int      nData = 0;
  tb_uid_t uid = 0;

wmmhello's avatar
wmmhello 已提交
187 188
  metaRLock(pMeta);

H
Hongze Cheng 已提交
189 190 191 192 193
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
    uid = *(tb_uid_t *)pData;
    tdbFree(pData);
  }

wmmhello's avatar
wmmhello 已提交
194 195
  metaULock(pMeta);

196
  return uid;
H
Hongze Cheng 已提交
197 198
}

dengyihao's avatar
dengyihao 已提交
199
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
200
  int         code = 0;
D
dapan1121 已提交
201
  SMetaReader mr = {0};
dengyihao's avatar
dengyihao 已提交
202
  metaReaderInit(&mr, (SMeta *)meta, 0);
203 204 205 206 207
  code = metaGetTableEntryByUid(&mr, uid);
  if (code < 0) {
    metaReaderClear(&mr);
    return -1;
  }
D
dapan1121 已提交
208 209 210 211 212 213

  STR_TO_VARSTR(tbName, mr.me.name);
  metaReaderClear(&mr);

  return 0;
}
dengyihao's avatar
dengyihao 已提交
214
int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid) {
dengyihao's avatar
dengyihao 已提交
215 216 217
  int         code = 0;
  SMetaReader mr = {0};
  metaReaderInit(&mr, (SMeta *)meta, 0);
dengyihao's avatar
dengyihao 已提交
218 219 220 221

  SMetaReader *pReader = &mr;

  // query name.idx
dengyihao's avatar
dengyihao 已提交
222
  if (tdbTbGet(pReader->pMeta->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
dengyihao's avatar
dengyihao 已提交
223
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
dengyihao's avatar
dengyihao 已提交
224 225 226 227
    metaReaderClear(&mr);
    return -1;
  }

dengyihao's avatar
dengyihao 已提交
228
  *uid = *(tb_uid_t *)pReader->pBuf;
dengyihao's avatar
dengyihao 已提交
229

dengyihao's avatar
dengyihao 已提交
230 231 232 233
  metaReaderClear(&mr);

  return 0;
}
D
dapan1121 已提交
234

dengyihao's avatar
dengyihao 已提交
235 236 237 238 239
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType) {
  int         code = 0;
  SMetaReader mr = {0};
  metaReaderInit(&mr, (SMeta *)meta, 0);

dengyihao's avatar
dengyihao 已提交
240 241 242
  code = metaGetTableEntryByName(&mr, tbName);
  if (code == 0) *tbType = mr.me.type;

dengyihao's avatar
dengyihao 已提交
243
  metaReaderClear(&mr);
dengyihao's avatar
dengyihao 已提交
244
  return code;
dengyihao's avatar
dengyihao 已提交
245 246
}

H
Hongze Cheng 已提交
247
int metaReadNext(SMetaReader *pReader) {
H
Hongze Cheng 已提交
248 249
  SMeta *pMeta = pReader->pMeta;

H
Hongze Cheng 已提交
250
  // TODO
H
Hongze Cheng 已提交
251

H
Hongze Cheng 已提交
252 253 254 255
  return 0;
}

#if 1  // ===================================================
H
Hongze Cheng 已提交
256 257 258 259 260 261 262 263
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;

  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
  if (pTbCur == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
264
  metaReaderInit(&pTbCur->mr, pMeta, 0);
H
Hongze Cheng 已提交
265

H
Hongze Cheng 已提交
266
  tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
H
Hongze Cheng 已提交
267

H
Hongze Cheng 已提交
268
  tdbTbcMoveToFirst(pTbCur->pDbc);
H
Hongze Cheng 已提交
269

H
Hongze Cheng 已提交
270 271 272 273 274
  return pTbCur;
}

void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
H
Hongze Cheng 已提交
275 276
    tdbFree(pTbCur->pKey);
    tdbFree(pTbCur->pVal);
H
Hongze Cheng 已提交
277
    metaReaderClear(&pTbCur->mr);
H
Hongze Cheng 已提交
278
    if (pTbCur->pDbc) {
H
Hongze Cheng 已提交
279
      tdbTbcClose(pTbCur->pDbc);
H
Hongze Cheng 已提交
280 281 282 283 284
    }
    taosMemoryFree(pTbCur);
  }
}

H
Hongze Cheng 已提交
285
int metaTbCursorNext(SMTbCursor *pTbCur) {
H
Hongze Cheng 已提交
286
  int    ret;
H
Hongze Cheng 已提交
287
  void  *pBuf;
H
Hongze Cheng 已提交
288 289 290
  STbCfg tbCfg;

  for (;;) {
H
Hongze Cheng 已提交
291
    ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
H
Hongze Cheng 已提交
292 293
    if (ret < 0) {
      return -1;
H
Hongze Cheng 已提交
294 295
    }

M
Minglei Jin 已提交
296 297
    tDecoderClear(&pTbCur->mr.coder);

H
Hongze Cheng 已提交
298
    metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
H
Hongze Cheng 已提交
299
    if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
300 301
      continue;
    }
H
Hongze Cheng 已提交
302 303

    break;
H
Hongze Cheng 已提交
304 305
  }

H
Hongze Cheng 已提交
306
  return 0;
H
Hongze Cheng 已提交
307 308
}

309
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
H
Hongze Cheng 已提交
310
  void           *pData = NULL;
H
Hongze Cheng 已提交
311 312 313 314 315
  int             nData = 0;
  int64_t         version;
  SSchemaWrapper  schema = {0};
  SSchemaWrapper *pSchema = NULL;
  SDecoder        dc = {0};
316 317 318
  if (lock) {
    metaRLock(pMeta);
  }
H
Hongze Cheng 已提交
319 320 321 322
_query:
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    goto _err;
  }
H
Hongze Cheng 已提交
323

H
Hongze Cheng 已提交
324
  version = ((SUidIdxVal *)pData)[0].version;
H
Hongze Cheng 已提交
325

H
Hongze Cheng 已提交
326 327 328 329 330 331
  tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
  SMetaEntry me = {0};
  tDecoderInit(&dc, pData, nData);
  metaDecodeEntry(&dc, &me);
  if (me.type == TSDB_SUPER_TABLE) {
    if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
H
Hongze Cheng 已提交
332
      pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
H
Hongze Cheng 已提交
333 334
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
335
    }
336 337
    {  // Traverse to find the previous qualified data
      TBC *pCur;
338 339
      tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
      STbDbKey key = {.version = sver, .uid = INT64_MAX};
340
      int      c = 0;
341
      tdbTbcMoveTo(pCur, &key, sizeof(key), &c);
342
      if (c < 0) {
343 344 345 346 347 348
        tdbTbcMoveToPrev(pCur);
      }

      void *pKey = NULL;
      void *pVal = NULL;
      int   vLen = 0, kLen = 0;
349
      while (1) {
350 351 352
        int32_t ret = tdbTbcPrev(pCur, &pKey, &kLen, &pVal, &vLen);
        if (ret < 0) break;

353 354
        STbDbKey *tmp = (STbDbKey *)pKey;
        if (tmp->uid != uid) {
355 356 357 358 359 360 361 362 363
          continue;
        }
        SDecoder   dcNew = {0};
        SMetaEntry meNew = {0};
        tDecoderInit(&dcNew, pVal, vLen);
        metaDecodeEntry(&dcNew, &meNew);
        pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow);
        tDecoderClear(&dcNew);
        tdbTbcClose(pCur);
364 365
        tdbFree(pKey);
        tdbFree(pVal);
366 367
        goto _exit;
      }
368 369
      tdbFree(pKey);
      tdbFree(pVal);
370 371
      tdbTbcClose(pCur);
    }
H
Hongze Cheng 已提交
372 373
  } else if (me.type == TSDB_CHILD_TABLE) {
    uid = me.ctbEntry.suid;
H
Hongze Cheng 已提交
374
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
375
    goto _query;
H
Hongze Cheng 已提交
376
  } else {
H
Hongze Cheng 已提交
377 378 379 380
    if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
      pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
381
    }
H
Hongze Cheng 已提交
382 383
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
384

H
Hongze Cheng 已提交
385 386 387
  // query from skm db
  if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
    goto _err;
H
Hongze Cheng 已提交
388
  }
H
Hongze Cheng 已提交
389

H
Hongze Cheng 已提交
390
  tDecoderInit(&dc, pData, nData);
M
Minglei Jin 已提交
391
  tDecodeSSchemaWrapperEx(&dc, &schema);
H
Hongze Cheng 已提交
392 393 394 395
  pSchema = tCloneSSchemaWrapper(&schema);
  tDecoderClear(&dc);

_exit:
396
  tDecoderClear(&dc);
397 398 399
  if (lock) {
    metaULock(pMeta);
  }
H
Hongze Cheng 已提交
400 401
  tdbFree(pData);
  return pSchema;
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403
_err:
404
  tDecoderClear(&dc);
405 406 407
  if (lock) {
    metaULock(pMeta);
  }
H
Hongze Cheng 已提交
408 409
  tdbFree(pData);
  return NULL;
H
Hongze Cheng 已提交
410 411
}

H
Hongze Cheng 已提交
412 413 414
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
  TBC *pCur;
  int  ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
415 416 417 418 419 420 421 422 423 424 425 426 427 428
  if (ret < 0) {
    return ret;
  }

  STtlIdxKey ttlKey = {0};
  ttlKey.dtime = ttl;
  ttlKey.uid = INT64_MAX;
  int c = 0;
  tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
  if (c < 0) {
    tdbTbcMoveToPrev(pCur);
  }

  void *pKey = NULL;
H
Hongze Cheng 已提交
429 430
  int   kLen = 0;
  while (1) {
431 432 433 434
    ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
    if (ret < 0) {
      break;
    }
H
Hongze Cheng 已提交
435
    ttlKey = *(STtlIdxKey *)pKey;
436 437
    taosArrayPush(uidList, &ttlKey.uid);
  }
wmmhello's avatar
wmmhello 已提交
438
  tdbFree(pKey);
439 440 441 442
  tdbTbcClose(pCur);
  return 0;
}

H
Hongze Cheng 已提交
443
struct SMCtbCursor {
H
Hongze Cheng 已提交
444 445
  SMeta   *pMeta;
  TBC     *pCur;
H
Hongze Cheng 已提交
446
  tb_uid_t suid;
H
Hongze Cheng 已提交
447 448
  void    *pKey;
  void    *pVal;
H
Hongze Cheng 已提交
449 450 451 452
  int      kLen;
  int      vLen;
};

453
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
H
Hongze Cheng 已提交
454
  SMCtbCursor *pCtbCur = NULL;
H
Hongze Cheng 已提交
455
  SCtbIdxKey   ctbIdxKey;
C
Cary Xu 已提交
456 457
  int          ret = 0;
  int          c = 0;
H
Hongze Cheng 已提交
458

H
Hongze Cheng 已提交
459 460 461 462
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
  if (pCtbCur == NULL) {
    return NULL;
  }
H
Hongze Cheng 已提交
463

H
fix  
Hongze Cheng 已提交
464
  pCtbCur->pMeta = pMeta;
H
Hongze Cheng 已提交
465
  pCtbCur->suid = uid;
466 467 468
  if (lock) {
    metaRLock(pMeta);
  }
H
fix  
Hongze Cheng 已提交
469

H
Hongze Cheng 已提交
470
  ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
H
Hongze Cheng 已提交
471
  if (ret < 0) {
H
fix  
Hongze Cheng 已提交
472
    metaULock(pMeta);
H
Hongze Cheng 已提交
473 474 475
    taosMemoryFree(pCtbCur);
    return NULL;
  }
H
Hongze Cheng 已提交
476

H
Hongze Cheng 已提交
477 478 479
  // move to the suid
  ctbIdxKey.suid = uid;
  ctbIdxKey.uid = INT64_MIN;
H
Hongze Cheng 已提交
480
  tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
H
Hongze Cheng 已提交
481
  if (c > 0) {
H
Hongze Cheng 已提交
482
    tdbTbcMoveToNext(pCtbCur->pCur);
H
Hongze Cheng 已提交
483 484
  }

H
Hongze Cheng 已提交
485 486 487
  return pCtbCur;
}

488
void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) {
H
Hongze Cheng 已提交
489
  if (pCtbCur) {
490
    if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta);
H
Hongze Cheng 已提交
491
    if (pCtbCur->pCur) {
H
Hongze Cheng 已提交
492
      tdbTbcClose(pCtbCur->pCur);
H
Hongze Cheng 已提交
493

H
Hongze Cheng 已提交
494 495
      tdbFree(pCtbCur->pKey);
      tdbFree(pCtbCur->pVal);
H
Hongze Cheng 已提交
496
    }
H
Hongze Cheng 已提交
497

H
Hongze Cheng 已提交
498 499
    taosMemoryFree(pCtbCur);
  }
H
Hongze Cheng 已提交
500 501 502
}

tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
503 504
  int         ret;
  SCtbIdxKey *pCtbIdxKey;
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506
  ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
H
Hongze Cheng 已提交
507 508 509
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
510

H
Hongze Cheng 已提交
511
  pCtbIdxKey = pCtbCur->pKey;
H
Hongze Cheng 已提交
512 513 514
  if (pCtbIdxKey->suid > pCtbCur->suid) {
    return 0;
  }
H
Hongze Cheng 已提交
515

H
Hongze Cheng 已提交
516
  return pCtbIdxKey->uid;
H
Hongze Cheng 已提交
517 518
}

C
Cary Xu 已提交
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
struct SMStbCursor {
  SMeta   *pMeta;
  TBC     *pCur;
  tb_uid_t suid;
  void    *pKey;
  void    *pVal;
  int      kLen;
  int      vLen;
};

SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
  SMStbCursor *pStbCur = NULL;
  int          ret = 0;
  int          c = 0;

  pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
  if (pStbCur == NULL) {
C
Cary Xu 已提交
536
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
537 538 539 540 541 542 543 544 545
    return NULL;
  }

  pStbCur->pMeta = pMeta;
  pStbCur->suid = suid;
  metaRLock(pMeta);

  ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
  if (ret < 0) {
C
Cary Xu 已提交
546
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
    metaULock(pMeta);
    taosMemoryFree(pStbCur);
    return NULL;
  }

  // move to the suid
  tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
  if (c > 0) {
    tdbTbcMoveToNext(pStbCur->pCur);
  }

  return pStbCur;
}

void metaCloseStbCursor(SMStbCursor *pStbCur) {
  if (pStbCur) {
    if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
    if (pStbCur->pCur) {
      tdbTbcClose(pStbCur->pCur);

      tdbFree(pStbCur->pKey);
      tdbFree(pStbCur->pVal);
    }

    taosMemoryFree(pStbCur);
  }
}

tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
  int ret;

  ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
582
  return *(tb_uid_t *)pStbCur->pKey;
C
Cary Xu 已提交
583 584
}

585
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
H
Hongze Cheng 已提交
586
  // SMetaReader     mr = {0};
H
Hongze Cheng 已提交
587
  STSchema       *pTSchema = NULL;
H
Hongze Cheng 已提交
588 589
  SSchemaWrapper *pSW = NULL;
  STSchemaBuilder sb = {0};
H
Hongze Cheng 已提交
590
  SSchema        *pSchema;
H
Hongze Cheng 已提交
591

592
  pSW = metaGetTableSchema(pMeta, uid, sver, lock);
C
Cary Xu 已提交
593
  if (!pSW) return NULL;
H
fix  
Hongze Cheng 已提交
594

595
  tdInitTSchemaBuilder(&sb, pSW->version);
H
Hongze Cheng 已提交
596 597 598 599 600
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
C
Cary Xu 已提交
601

H
Hongze Cheng 已提交
602 603
  tdDestroyTSchemaBuilder(&sb);

H
Hongze Cheng 已提交
604 605
  taosMemoryFree(pSW->pSchema);
  taosMemoryFree(pSW);
H
Hongze Cheng 已提交
606 607 608
  return pTSchema;
}

H
Hongze Cheng 已提交
609
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
H
Hongze Cheng 已提交
610 611
  int32_t code = 0;

H
Hongze Cheng 已提交
612 613
  void     *pData = NULL;
  int       nData = 0;
H
Hongze Cheng 已提交
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
  SSkmDbKey skmDbKey;
  if (sver <= 0) {
    SMetaInfo info;
    if (metaGetInfo(pMeta, suid ? suid : uid, &info) == 0) {
      sver = info.skmVer;
    } else {
      TBC *pSkmDbC = NULL;
      int  c;

      skmDbKey.uid = suid ? suid : uid;
      skmDbKey.sver = INT32_MAX;

      tdbTbcOpen(pMeta->pSkmDb, &pSkmDbC, NULL);
      metaRLock(pMeta);

      if (tdbTbcMoveTo(pSkmDbC, &skmDbKey, sizeof(skmDbKey), &c) < 0) {
        metaULock(pMeta);
        tdbTbcClose(pSkmDbC);
        code = TSDB_CODE_NOT_FOUND;
        goto _exit;
      }

      ASSERT(c);

      if (c < 0) {
        tdbTbcMoveToPrev(pSkmDbC);
      }

      const void *pKey = NULL;
      int32_t     nKey = 0;
      tdbTbcGet(pSkmDbC, &pKey, &nKey, NULL, NULL);

H
Hongze Cheng 已提交
646
      if (((SSkmDbKey *)pKey)->uid != skmDbKey.uid) {
H
Hongze Cheng 已提交
647 648 649 650 651 652 653 654 655 656 657 658
        metaULock(pMeta);
        tdbTbcClose(pSkmDbC);
        code = TSDB_CODE_NOT_FOUND;
        goto _exit;
      }

      sver = ((SSkmDbKey *)pKey)->sver;

      metaULock(pMeta);
      tdbTbcClose(pSkmDbC);
    }
  }
H
Hongze Cheng 已提交
659

H
Hongze Cheng 已提交
660 661 662 663
  ASSERT(sver > 0);

  skmDbKey.uid = suid ? suid : uid;
  skmDbKey.sver = sver;
H
Hongze Cheng 已提交
664
  metaRLock(pMeta);
H
Hongze Cheng 已提交
665
  if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(SSkmDbKey), &pData, &nData) < 0) {
H
Hongze Cheng 已提交
666
    metaULock(pMeta);
H
Hongze Cheng 已提交
667 668
    code = TSDB_CODE_NOT_FOUND;
    goto _exit;
H
Hongze Cheng 已提交
669 670 671 672 673 674 675 676 677
  }
  metaULock(pMeta);

  // decode
  SDecoder        dc = {0};
  SSchemaWrapper  schema;
  SSchemaWrapper *pSchemaWrapper = &schema;

  tDecoderInit(&dc, pData, nData);
M
Minglei Jin 已提交
678
  (void)tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
H
Hongze Cheng 已提交
679
  tDecoderClear(&dc);
680
  tdbFree(pData);
H
Hongze Cheng 已提交
681 682 683 684 685 686 687 688 689

  // convert
  STSchemaBuilder sb = {0};

  tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
  for (int i = 0; i < pSchemaWrapper->nCols; i++) {
    SSchema *pSchema = pSchemaWrapper->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
H
Haojun Liao 已提交
690

H
Hongze Cheng 已提交
691
  STSchema *pTSchema = tdGetSchemaFromBuilder(&sb);
H
Haojun Liao 已提交
692 693 694 695
  if (pTSchema == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

H
Hongze Cheng 已提交
696 697 698 699 700
  tdDestroyTSchemaBuilder(&sb);

  *ppTSchema = pTSchema;
  taosMemoryFree(pSchemaWrapper->pSchema);

H
Hongze Cheng 已提交
701
_exit:
H
Hongze Cheng 已提交
702 703 704
  return code;
}

705 706
// N.B. Called by statusReq per second
int64_t metaGetTbNum(SMeta *pMeta) {
707 708 709 710 711
  // num of child tables (excluding normal tables , stables and others)

  /* int64_t num = 0; */
  /* vnodeGetAllCtbNum(pMeta->pVnode, &num); */

712
  return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
713 714 715
}

// N.B. Called by statusReq per second
M
Minglei Jin 已提交
716
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
717
  // sum of (number of columns of stable -  1) * number of ctables (excluding timestamp column)
718 719
  if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
      ++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
720 721 722 723
    int64_t num = 0;
    vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
    pMeta->pVnode->config.vndStats.numOfTimeSeries = num;

724
    pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2;
725
  }
726

727
  return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
C
Cary Xu 已提交
728
}
H
Hongze Cheng 已提交
729

C
Cary Xu 已提交
730
typedef struct {
H
Hongze Cheng 已提交
731 732
  SMeta   *pMeta;
  TBC     *pCur;
C
Cary Xu 已提交
733
  tb_uid_t uid;
H
Hongze Cheng 已提交
734 735
  void    *pKey;
  void    *pVal;
C
Cary Xu 已提交
736 737 738 739 740 741 742 743 744 745 746 747 748
  int      kLen;
  int      vLen;
} SMSmaCursor;

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pSmaCur = NULL;
  SSmaIdxKey   smaIdxKey;
  int          ret;
  int          c;

  pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
  if (pSmaCur == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
749 750 751
    return NULL;
  }

C
Cary Xu 已提交
752 753 754 755
  pSmaCur->pMeta = pMeta;
  pSmaCur->uid = uid;
  metaRLock(pMeta);

H
Hongze Cheng 已提交
756
  ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
C
Cary Xu 已提交
757 758 759
  if (ret < 0) {
    metaULock(pMeta);
    taosMemoryFree(pSmaCur);
H
Hongze Cheng 已提交
760 761 762
    return NULL;
  }

C
Cary Xu 已提交
763 764 765
  // move to the suid
  smaIdxKey.uid = uid;
  smaIdxKey.smaUid = INT64_MIN;
H
Hongze Cheng 已提交
766
  tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
C
Cary Xu 已提交
767
  if (c > 0) {
H
Hongze Cheng 已提交
768
    tdbTbcMoveToNext(pSmaCur->pCur);
C
Cary Xu 已提交
769
  }
H
Hongze Cheng 已提交
770

C
Cary Xu 已提交
771 772
  return pSmaCur;
}
H
Hongze Cheng 已提交
773

C
Cary Xu 已提交
774 775 776 777
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
  if (pSmaCur) {
    if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
    if (pSmaCur->pCur) {
H
Hongze Cheng 已提交
778
      tdbTbcClose(pSmaCur->pCur);
H
Hongze Cheng 已提交
779

C
Cary Xu 已提交
780 781 782
      tdbFree(pSmaCur->pKey);
      tdbFree(pSmaCur->pVal);
    }
H
Hongze Cheng 已提交
783

C
Cary Xu 已提交
784 785 786 787 788 789 790 791
    taosMemoryFree(pSmaCur);
  }
}

tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
  int         ret;
  SSmaIdxKey *pSmaIdxKey;

H
Hongze Cheng 已提交
792
  ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
C
Cary Xu 已提交
793 794 795 796 797 798 799 800 801 802 803 804 805 806
  if (ret < 0) {
    return 0;
  }

  pSmaIdxKey = pSmaCur->pKey;
  if (pSmaIdxKey->uid > pSmaCur->uid) {
    return 0;
  }

  return pSmaIdxKey->uid;
}

STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
  STSmaWrapper *pSW = NULL;
H
Hongze Cheng 已提交
807
  SArray       *pSmaIds = NULL;
C
Cary Xu 已提交
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830

  if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
    return NULL;
  }

  pSW = taosMemoryCalloc(1, sizeof(*pSW));
  if (!pSW) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pSW->number = taosArrayGetSize(pSmaIds);
  pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));

  if (!pSW->tSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  int64_t smaId;
  int     smaIdx = 0;
H
Hongze Cheng 已提交
831
  STSma  *pTSma = NULL;
C
Cary Xu 已提交
832 833 834
  for (int i = 0; i < pSW->number; ++i) {
    smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
    if (metaGetTableEntryByUid(&mr, smaId) < 0) {
835
      tDecoderClear(&mr.coder);
S
Shengliang Guan 已提交
836
      metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
C
Cary Xu 已提交
837 838
      continue;
    }
839
    tDecoderClear(&mr.coder);
C
Cary Xu 已提交
840 841 842 843 844 845 846 847
    pTSma = pSW->tSma + smaIdx;
    memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
    if (deepCopy) {
      if (pTSma->exprLen > 0) {
        if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
848
        memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
H
Hongze Cheng 已提交
849
      }
C
Cary Xu 已提交
850 851 852 853 854
      if (pTSma->tagsFilterLen > 0) {
        if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
855
      }
H
Hongze Cheng 已提交
856
      memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
C
Cary Xu 已提交
857 858 859 860 861
    } else {
      pTSma->exprLen = 0;
      pTSma->expr = NULL;
      pTSma->tagsFilterLen = 0;
      pTSma->tagsFilter = NULL;
H
Hongze Cheng 已提交
862
    }
H
Hongze Cheng 已提交
863

C
Cary Xu 已提交
864
    ++smaIdx;
H
Hongze Cheng 已提交
865 866
  }

C
Cary Xu 已提交
867 868
  if (smaIdx <= 0) goto _err;
  pSW->number = smaIdx;
H
Hongze Cheng 已提交
869

C
Cary Xu 已提交
870 871
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
H
Hongze Cheng 已提交
872
  return pSW;
C
Cary Xu 已提交
873 874 875
_err:
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
C
Cary Xu 已提交
876
  tFreeTSmaWrapper(pSW, deepCopy);
H
Hongze Cheng 已提交
877 878 879
  return NULL;
}

C
Cary Xu 已提交
880
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
H
Hongze Cheng 已提交
881
  STSma      *pTSma = NULL;
C
Cary Xu 已提交
882 883 884
  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
S
Shengliang Guan 已提交
885
    metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
C
Cary Xu 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898 899
    metaReaderClear(&mr);
    return NULL;
  }
  pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
  if (!pTSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    metaReaderClear(&mr);
    return NULL;
  }

  memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));

  metaReaderClear(&mr);
  return pTSma;
H
Hongze Cheng 已提交
900 901
}

C
Cary Xu 已提交
902
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
H
Hongze Cheng 已提交
903
  SArray     *pUids = NULL;
C
Cary Xu 已提交
904
  SSmaIdxKey *pSmaIdxKey = NULL;
H
Hongze Cheng 已提交
905

C
Cary Xu 已提交
906 907
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (!pCur) {
H
Hongze Cheng 已提交
908 909 910
    return NULL;
  }

C
Cary Xu 已提交
911 912 913 914 915
  while (1) {
    tb_uid_t id = metaSmaCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Hongze Cheng 已提交
916

C
Cary Xu 已提交
917 918
    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
H
Hongze Cheng 已提交
919
      if (!pUids) {
C
Cary Xu 已提交
920 921 922
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
H
Hongze Cheng 已提交
923
      }
C
Cary Xu 已提交
924
    }
H
Hongze Cheng 已提交
925

C
Cary Xu 已提交
926
    pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
H
Hongze Cheng 已提交
927

C
Cary Xu 已提交
928
    if (!taosArrayPush(pUids, &pSmaIdxKey->smaUid)) {
C
Cary Xu 已提交
929 930 931 932
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
H
Hongze Cheng 已提交
933 934 935 936 937 938 939
    }
  }

  metaCloseSmaCursor(pCur);
  return pUids;
}

C
Cary Xu 已提交
940
SArray *metaGetSmaTbUids(SMeta *pMeta) {
H
Hongze Cheng 已提交
941
  SArray     *pUids = NULL;
C
Cary Xu 已提交
942 943 944 945 946
  SSmaIdxKey *pSmaIdxKey = NULL;
  tb_uid_t    lastUid = 0;

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
  if (!pCur) {
H
Hongze Cheng 已提交
947 948 949
    return NULL;
  }

C
Cary Xu 已提交
950 951 952 953 954
  while (1) {
    tb_uid_t uid = metaSmaCursorNext(pCur);
    if (uid == 0) {
      break;
    }
H
Hongze Cheng 已提交
955

C
Cary Xu 已提交
956 957 958
    if (lastUid == uid) {
      continue;
    }
H
Hongze Cheng 已提交
959

C
Cary Xu 已提交
960 961 962 963 964 965 966 967 968 969 970
    lastUid = uid;

    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
      if (!pUids) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
      }
    }

C
Cary Xu 已提交
971
    if (!taosArrayPush(pUids, &uid)) {
C
Cary Xu 已提交
972 973 974 975 976
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
    }
H
Hongze Cheng 已提交
977 978
  }

C
Cary Xu 已提交
979 980
  metaCloseSmaCursor(pCur);
  return pUids;
H
Hongze Cheng 已提交
981 982
}

L
Liu Jicong 已提交
983
#endif
H
Hongze Cheng 已提交
984

985
const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
dengyihao's avatar
dengyihao 已提交
986
  STag *tag = (STag *)pTag;
dengyihao's avatar
dengyihao 已提交
987
  if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
988 989
    return tag;
  }
wmmhello's avatar
wmmhello 已提交
990 991
  bool find = tTagGet(tag, val);

dengyihao's avatar
dengyihao 已提交
992
  if (!find) {
wmmhello's avatar
wmmhello 已提交
993 994
    return NULL;
  }
wmmhello's avatar
wmmhello 已提交
995

wmmhello's avatar
wmmhello 已提交
996
#ifdef TAG_FILTER_DEBUG
wmmhello's avatar
wmmhello 已提交
997
  if (IS_VAR_DATA_TYPE(val->type)) {
998
    char *buf = taosMemoryCalloc(val->nData + 1, 1);
wmmhello's avatar
wmmhello 已提交
999 1000 1001 1002 1003 1004 1005 1006 1007
    memcpy(buf, val->pData, val->nData);
    metaDebug("metaTag table val varchar index:%d cid:%d type:%d value:%s", 1, val->cid, val->type, buf);
    taosMemoryFree(buf);
  } else {
    double dval = 0;
    GET_TYPED_DATA(dval, double, val->type, &val->i64);
    metaDebug("metaTag table val number index:%d cid:%d type:%d value:%f", 1, val->cid, val->type, dval);
  }

1008 1009
  SArray *pTagVals = NULL;
  tTagToValArray((STag *)pTag, &pTagVals);
wmmhello's avatar
wmmhello 已提交
1010
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
1011
    STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
wmmhello's avatar
wmmhello 已提交
1012 1013

    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
1014
      char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
wmmhello's avatar
wmmhello 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023
      memcpy(buf, pTagVal->pData, pTagVal->nData);
      metaDebug("metaTag table varchar index:%d cid:%d type:%d value:%s", i, pTagVal->cid, pTagVal->type, buf);
      taosMemoryFree(buf);
    } else {
      double dval = 0;
      GET_TYPED_DATA(dval, double, pTagVal->type, &pTagVal->i64);
      metaDebug("metaTag table number index:%d cid:%d type:%d value:%f", i, pTagVal->cid, pTagVal->type, dval);
    }
  }
wmmhello's avatar
wmmhello 已提交
1024
#endif
wmmhello's avatar
wmmhello 已提交
1025

wmmhello's avatar
wmmhello 已提交
1026
  return val;
dengyihao's avatar
dengyihao 已提交
1027
}
wmmhello's avatar
wmmhello 已提交
1028 1029

typedef struct {
H
Hongze Cheng 已提交
1030 1031
  SMeta   *pMeta;
  TBC     *pCur;
wmmhello's avatar
wmmhello 已提交
1032 1033 1034
  tb_uid_t suid;
  int16_t  cid;
  int16_t  type;
H
Hongze Cheng 已提交
1035 1036
  void    *pKey;
  void    *pVal;
wmmhello's avatar
wmmhello 已提交
1037 1038 1039 1040
  int32_t  kLen;
  int32_t  vLen;
} SIdxCursor;

dengyihao's avatar
dengyihao 已提交
1041 1042 1043
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
  int32_t ret = 0;
  char   *buf = NULL;
wmmhello's avatar
wmmhello 已提交
1044

dengyihao's avatar
dengyihao 已提交
1045 1046 1047 1048
  STagIdxKey *pKey = NULL;
  int32_t     nKey = 0;

  SIdxCursor *pCursor = NULL;
wmmhello's avatar
wmmhello 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
  pCursor->pMeta = pMeta;
  pCursor->suid = param->suid;
  pCursor->cid = param->cid;
  pCursor->type = param->type;

  metaRLock(pMeta);
  ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
  if (ret < 0) {
    goto END;
  }

dengyihao's avatar
dengyihao 已提交
1061
  int32_t maxSize = 0;
wmmhello's avatar
wmmhello 已提交
1062
  int32_t nTagData = 0;
H
Hongze Cheng 已提交
1063
  void   *tagData = NULL;
dengyihao's avatar
dengyihao 已提交
1064

dengyihao's avatar
dengyihao 已提交
1065
  if (param->val == NULL) {
S
Shengliang Guan 已提交
1066
    metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
dengyihao's avatar
dengyihao 已提交
1067
    goto END;
dengyihao's avatar
dengyihao 已提交
1068
  } else {
dengyihao's avatar
dengyihao 已提交
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
    if (IS_VAR_DATA_TYPE(param->type)) {
      tagData = varDataVal(param->val);
      nTagData = varDataLen(param->val);

      if (param->type == TSDB_DATA_TYPE_NCHAR) {
        maxSize = 4 * nTagData + 1;
        buf = taosMemoryCalloc(1, maxSize);
        if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
          goto END;
        }
dengyihao's avatar
dengyihao 已提交
1079

dengyihao's avatar
dengyihao 已提交
1080 1081 1082 1083 1084 1085
        tagData = buf;
        nTagData = maxSize;
      }
    } else {
      tagData = param->val;
      nTagData = tDataTypes[param->type].bytes;
dengyihao's avatar
dengyihao 已提交
1086
    }
wmmhello's avatar
wmmhello 已提交
1087
  }
dengyihao's avatar
dengyihao 已提交
1088
  ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
wmmhello's avatar
wmmhello 已提交
1089
                            param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
dengyihao's avatar
dengyihao 已提交
1090

wmmhello's avatar
wmmhello 已提交
1091 1092 1093 1094 1095 1096 1097
  if (ret != 0) {
    goto END;
  }
  int cmp = 0;
  if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
1098

dengyihao's avatar
dengyihao 已提交
1099
  bool    first = true;
dengyihao's avatar
dengyihao 已提交
1100
  int32_t valid = 0;
wmmhello's avatar
wmmhello 已提交
1101
  while (1) {
dengyihao's avatar
dengyihao 已提交
1102 1103 1104
    void   *entryKey = NULL, *entryVal = NULL;
    int32_t nEntryKey, nEntryVal;

wmmhello's avatar
wmmhello 已提交
1105 1106 1107 1108 1109
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
    if (valid < 0) {
      break;
    }
    STagIdxKey *p = entryKey;
dengyihao's avatar
dengyihao 已提交
1110 1111 1112 1113 1114 1115 1116 1117 1118
    if (p->type != pCursor->type) {
      if (first) {
        valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
        if (valid < 0) break;
        continue;
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
1119
    if (p == NULL || p->suid != pKey->suid) {
dengyihao's avatar
dengyihao 已提交
1120 1121
      break;
    }
dengyihao's avatar
dengyihao 已提交
1122
    first = false;
dengyihao's avatar
dengyihao 已提交
1123 1124 1125 1126 1127 1128
    int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
    if (cmp == 0) {
      // match
      tb_uid_t tuid = 0;
      if (IS_VAR_DATA_TYPE(pKey->type)) {
        tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
wmmhello's avatar
wmmhello 已提交
1129
      } else {
dengyihao's avatar
dengyihao 已提交
1130
        tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
wmmhello's avatar
wmmhello 已提交
1131
      }
dengyihao's avatar
dengyihao 已提交
1132 1133 1134 1135 1136 1137
      taosArrayPush(pUids, &tuid);
    } else if (cmp == 1) {
      // not match but should continue to iter
    } else {
      // not match and no more result
      break;
wmmhello's avatar
wmmhello 已提交
1138 1139 1140 1141 1142 1143
    }
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
    if (valid < 0) {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
1144

wmmhello's avatar
wmmhello 已提交
1145 1146 1147
END:
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
dengyihao's avatar
dengyihao 已提交
1148
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
1149
  taosMemoryFree(pKey);
wmmhello's avatar
wmmhello 已提交
1150 1151 1152 1153

  taosMemoryFree(pCursor);

  return ret;
dengyihao's avatar
dengyihao 已提交
1154
}
H
Hongze Cheng 已提交
1155

dengyihao's avatar
dengyihao 已提交
1156
static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, void **tag, int32_t *len, bool lock) {
dengyihao's avatar
dengyihao 已提交
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
  int ret = 0;
  if (lock) {
    metaRLock(pMeta);
  }

  SCtbIdxKey ctbIdxKey = {.suid = suid, .uid = uid};
  ret = tdbTbGet(pMeta->pCtbIdx, &ctbIdxKey, sizeof(SCtbIdxKey), tag, len);
  if (lock) {
    metaULock(pMeta);
  }

  return ret;
}
dengyihao's avatar
dengyihao 已提交
1170
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags) {
dengyihao's avatar
dengyihao 已提交
1171 1172 1173
  const int32_t LIMIT = 128;

  int32_t isLock = false;
dengyihao's avatar
dengyihao 已提交
1174 1175
  int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
  for (int i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
    tb_uid_t *id = taosArrayGet(uidList, i);

    if (i % LIMIT == 0) {
      if (isLock) metaULock(pMeta);

      metaRLock(pMeta);
      isLock = true;
    }

    if (taosHashGet(tags, id, sizeof(tb_uid_t)) == NULL) {
      void   *val = NULL;
      int32_t len = 0;
      if (metaGetTableTagByUid(pMeta, suid, *id, &val, &len, false) == 0) {
        taosHashPut(tags, id, sizeof(tb_uid_t), val, len);
        tdbFree(val);
dengyihao's avatar
dengyihao 已提交
1191 1192 1193
      } else {
        metaError("vgId:%d, failed to table IDs, suid: %" PRId64 ", uid: %" PRId64 "", TD_VID(pMeta->pVnode), suid,
                  *id);
dengyihao's avatar
dengyihao 已提交
1194
      }
dengyihao's avatar
dengyihao 已提交
1195 1196
    }
  }
dengyihao's avatar
dengyihao 已提交
1197 1198
  if (isLock) metaULock(pMeta);

dengyihao's avatar
dengyihao 已提交
1199 1200
  return 0;
}
dengyihao's avatar
dengyihao 已提交
1201

1202
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
1203
  SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid, 1);
1204

1205
  SHashObj *uHash = NULL;
dengyihao's avatar
dengyihao 已提交
1206 1207
  size_t    len = taosArrayGetSize(uidList);  // len > 0 means there already have uids
  if (len > 0) {
1208
    uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1209
    for (int i = 0; i < len; i++) {
1210 1211 1212 1213 1214 1215 1216 1217 1218 1219
      int64_t *uid = taosArrayGet(uidList, i);
      taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
    }
  }
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }

wmmhello's avatar
wmmhello 已提交
1220
    if (len > 0 && taosHashGet(uHash, &id, sizeof(int64_t)) == NULL) {
1221
      continue;
dengyihao's avatar
dengyihao 已提交
1222
    } else if (len == 0) {
1223 1224 1225
      taosArrayPush(uidList, &id);
    }

1226
    taosHashPut(tags, &id, sizeof(int64_t), pCur->pVal, pCur->vLen);
1227 1228
  }

1229
  taosHashCleanup(uHash);
1230
  metaCloseCtbCursor(pCur, 1);
1231 1232
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1233

H
Hongze Cheng 已提交
1234 1235
int32_t metaCacheGet(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo);

H
Hongze Cheng 已提交
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
  int32_t code = 0;
  void   *pData = NULL;
  int     nData = 0;

  metaRLock(pMeta);

  // search cache
  if (metaCacheGet(pMeta, uid, pInfo) == 0) {
    metaULock(pMeta);
    goto _exit;
  }

  // search TDB
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    // not found
    metaULock(pMeta);
    code = TSDB_CODE_NOT_FOUND;
    goto _exit;
  }

  metaULock(pMeta);

  pInfo->uid = uid;
  pInfo->suid = ((SUidIdxVal *)pData)->suid;
  pInfo->version = ((SUidIdxVal *)pData)->version;
  pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;

  // upsert the cache
  metaWLock(pMeta);
  metaCacheUpsert(pMeta, pInfo);
  metaULock(pMeta);

_exit:
  tdbFree(pData);
  return code;
}
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301

int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) {
  int32_t code = 0;

  metaRLock(pMeta);

  // fast path: search cache
  if (metaStatsCacheGet(pMeta, uid, pInfo) == TSDB_CODE_SUCCESS) {
    metaULock(pMeta);
    goto _exit;
  }

  // slow path: search TDB
  int64_t ctbNum = 0;
  vnodeGetCtbNum(pMeta->pVnode, uid, &ctbNum);

  metaULock(pMeta);

  pInfo->uid = uid;
  pInfo->ctbNum = ctbNum;

  // upsert the cache
  metaWLock(pMeta);
  metaStatsCacheUpsert(pMeta, pInfo);
  metaULock(pMeta);

_exit:
  return code;
}