metaQuery.c 28.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "meta.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
H
Hongze Cheng 已提交
19 20
  memset(pReader, 0, sizeof(*pReader));
  pReader->flags = flags;
H
Hongze Cheng 已提交
21
  pReader->pMeta = pMeta;
H
Hongze Cheng 已提交
22
  metaRLock(pMeta);
H
Hongze Cheng 已提交
23
}
H
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25
void metaReaderClear(SMetaReader *pReader) {
H
Hongze Cheng 已提交
26 27 28
  if (pReader->pMeta) {
    metaULock(pReader->pMeta);
  }
H
Hongze Cheng 已提交
29
  tDecoderClear(&pReader->coder);
H
Hongze Cheng 已提交
30
  tdbFree(pReader->pBuf);
H
Hongze Cheng 已提交
31 32
}

H
Hongze Cheng 已提交
33
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
H
Hongze Cheng 已提交
34
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
35 36
  STbDbKey tbDbKey = {.version = version, .uid = uid};

H
Hongze Cheng 已提交
37
  // query table.db
H
Hongze Cheng 已提交
38
  if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
39
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
40 41 42 43
    goto _err;
  }

  // decode the entry
H
Hongze Cheng 已提交
44
  tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
H
Hongze Cheng 已提交
45 46 47 48 49 50 51 52 53 54 55

  if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
    goto _err;
  }

  return 0;

_err:
  return -1;
}

dengyihao's avatar
dengyihao 已提交
56
// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) {
wmmhello's avatar
wmmhello 已提交
57
//
dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
//   SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader));
//   SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey));
//   SMeta  *pMeta = meta;
//   int64_t version;
//   SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
63
//
dengyihao's avatar
dengyihao 已提交
64 65 66 67 68 69 70 71 72 73
//   int64_t stt1 = taosGetTimestampUs();
//   for(int i = 0; i < taosArrayGetSize(uidList); i++) {
//     void* ppVal = NULL;
//     int vlen = 0;
//     uint64_t *  uid = taosArrayGet(uidList, i);
//     // query uid.idx
//     if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) {
//       continue;
//     }
//     version = *(int64_t *)ppVal;
wmmhello's avatar
wmmhello 已提交
74
//
dengyihao's avatar
dengyihao 已提交
75 76 77 78 79 80
//     STbDbKey tbDbKey = {.version = version, .uid = *uid};
//     taosArrayPush(uidVersion, &tbDbKey);
//     taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t));
//   }
//   int64_t stt2 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1);
wmmhello's avatar
wmmhello 已提交
81
//
dengyihao's avatar
dengyihao 已提交
82 83 84 85 86
//   TBC        *pCur = NULL;
//   tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
//   tdbTbcMoveToFirst(pCur);
//   void *pKey = NULL;
//   int   kLen = 0;
wmmhello's avatar
wmmhello 已提交
87
//
dengyihao's avatar
dengyihao 已提交
88 89 90 91 92 93 94 95 96 97
//   while(1){
//     SMetaReader pReader = {0};
//     int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf);
//     if (ret < 0) break;
//     STbDbKey *tmp = (STbDbKey*)pKey;
//     int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t));
//     if(ver == NULL || *ver != tmp->version) continue;
//     taosArrayPush(readerList, &pReader);
//   }
//   tdbTbcClose(pCur);
wmmhello's avatar
wmmhello 已提交
98
//
dengyihao's avatar
dengyihao 已提交
99 100 101 102 103
//   taosArrayClear(readerList);
//   int64_t stt3 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2);
//   for(int i = 0; i < taosArrayGetSize(uidVersion); i++) {
//     SMetaReader pReader = {0};
wmmhello's avatar
wmmhello 已提交
104
//
dengyihao's avatar
dengyihao 已提交
105 106 107 108 109 110 111 112 113
//     STbDbKey *tbDbKey = taosArrayGet(uidVersion, i);
//     // query table.db
//     if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) {
//       continue;
//     }
//     taosArrayPush(readerList, &pReader);
//   }
//   int64_t stt4 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3);
wmmhello's avatar
wmmhello 已提交
114
//
dengyihao's avatar
dengyihao 已提交
115 116 117 118 119
//   for(int i = 0; i < taosArrayGetSize(readerList); i++){
//     SMetaReader* pReader  = taosArrayGet(readerList, i);
//     metaReaderInit(pReader, meta, 0);
//     // decode the entry
//     tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
wmmhello's avatar
wmmhello 已提交
120
//
dengyihao's avatar
dengyihao 已提交
121 122 123 124 125 126 127 128
//     if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
//     }
//     metaReaderClear(pReader);
//   }
//   int64_t stt5 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4);
//   return 0;
// }
wmmhello's avatar
wmmhello 已提交
129

H
Hongze Cheng 已提交
130
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
131
  // query uid.idx
132 133
  metaRLock(pMeta);

134
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
135 136
    metaULock(pMeta);

137 138 139
    return false;
  }

140 141
  metaULock(pMeta);

142 143 144
  return true;
}

H
Hongze Cheng 已提交
145
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
H
Hongze Cheng 已提交
146
  SMeta  *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
147 148 149
  int64_t version;

  // query uid.idx
H
Hongze Cheng 已提交
150
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
151
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
152 153 154
    return -1;
  }

H
Hongze Cheng 已提交
155
  version = ((SUidIdxVal *)pReader->pBuf)[0].version;
H
Hongze Cheng 已提交
156
  return metaGetTableEntryByVersion(pReader, version, uid);
H
Hongze Cheng 已提交
157 158
}

H
Hongze Cheng 已提交
159
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
H
Hongze Cheng 已提交
160
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
161 162 163
  tb_uid_t uid;

  // query name.idx
H
Hongze Cheng 已提交
164
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
165
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
166 167 168 169
    return -1;
  }

  uid = *(tb_uid_t *)pReader->pBuf;
H
Hongze Cheng 已提交
170
  return metaGetTableEntryByUid(pReader, uid);
H
Hongze Cheng 已提交
171 172
}

H
Hongze Cheng 已提交
173
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
H
Hongze Cheng 已提交
174
  void    *pData = NULL;
H
Hongze Cheng 已提交
175 176 177
  int      nData = 0;
  tb_uid_t uid = 0;

wmmhello's avatar
wmmhello 已提交
178 179
  metaRLock(pMeta);

H
Hongze Cheng 已提交
180 181 182 183 184
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
    uid = *(tb_uid_t *)pData;
    tdbFree(pData);
  }

wmmhello's avatar
wmmhello 已提交
185 186
  metaULock(pMeta);

187
  return uid;
H
Hongze Cheng 已提交
188 189
}

dengyihao's avatar
dengyihao 已提交
190
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
191
  int         code = 0;
D
dapan1121 已提交
192
  SMetaReader mr = {0};
dengyihao's avatar
dengyihao 已提交
193
  metaReaderInit(&mr, (SMeta *)meta, 0);
194 195 196 197 198
  code = metaGetTableEntryByUid(&mr, uid);
  if (code < 0) {
    metaReaderClear(&mr);
    return -1;
  }
D
dapan1121 已提交
199 200 201 202 203 204 205

  STR_TO_VARSTR(tbName, mr.me.name);
  metaReaderClear(&mr);

  return 0;
}

H
Hongze Cheng 已提交
206
int metaReadNext(SMetaReader *pReader) {
H
Hongze Cheng 已提交
207 208
  SMeta *pMeta = pReader->pMeta;

H
Hongze Cheng 已提交
209
  // TODO
H
Hongze Cheng 已提交
210

H
Hongze Cheng 已提交
211 212 213 214
  return 0;
}

#if 1  // ===================================================
H
Hongze Cheng 已提交
215 216 217 218 219 220 221 222
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;

  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
  if (pTbCur == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
223
  metaReaderInit(&pTbCur->mr, pMeta, 0);
H
Hongze Cheng 已提交
224

H
Hongze Cheng 已提交
225
  tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
H
Hongze Cheng 已提交
226

H
Hongze Cheng 已提交
227
  tdbTbcMoveToFirst(pTbCur->pDbc);
H
Hongze Cheng 已提交
228

H
Hongze Cheng 已提交
229 230 231 232 233
  return pTbCur;
}

void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
H
Hongze Cheng 已提交
234 235
    tdbFree(pTbCur->pKey);
    tdbFree(pTbCur->pVal);
H
Hongze Cheng 已提交
236
    metaReaderClear(&pTbCur->mr);
H
Hongze Cheng 已提交
237
    if (pTbCur->pDbc) {
H
Hongze Cheng 已提交
238
      tdbTbcClose(pTbCur->pDbc);
H
Hongze Cheng 已提交
239 240 241 242 243
    }
    taosMemoryFree(pTbCur);
  }
}

H
Hongze Cheng 已提交
244
int metaTbCursorNext(SMTbCursor *pTbCur) {
H
Hongze Cheng 已提交
245
  int    ret;
H
Hongze Cheng 已提交
246
  void  *pBuf;
H
Hongze Cheng 已提交
247 248 249
  STbCfg tbCfg;

  for (;;) {
H
Hongze Cheng 已提交
250
    ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
H
Hongze Cheng 已提交
251 252
    if (ret < 0) {
      return -1;
H
Hongze Cheng 已提交
253 254
    }

M
Minglei Jin 已提交
255 256
    tDecoderClear(&pTbCur->mr.coder);

H
Hongze Cheng 已提交
257
    metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
H
Hongze Cheng 已提交
258
    if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
259 260
      continue;
    }
H
Hongze Cheng 已提交
261 262

    break;
H
Hongze Cheng 已提交
263 264
  }

H
Hongze Cheng 已提交
265
  return 0;
H
Hongze Cheng 已提交
266 267
}

H
Hongze Cheng 已提交
268
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
H
Hongze Cheng 已提交
269
  void           *pData = NULL;
H
Hongze Cheng 已提交
270 271 272 273 274 275
  int             nData = 0;
  int64_t         version;
  SSchemaWrapper  schema = {0};
  SSchemaWrapper *pSchema = NULL;
  SDecoder        dc = {0};

H
fix  
Hongze Cheng 已提交
276
  metaRLock(pMeta);
H
Hongze Cheng 已提交
277 278 279 280
_query:
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    goto _err;
  }
H
Hongze Cheng 已提交
281

H
Hongze Cheng 已提交
282
  version = ((SUidIdxVal *)pData)[0].version;
H
Hongze Cheng 已提交
283

H
Hongze Cheng 已提交
284 285 286 287 288 289
  tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
  SMetaEntry me = {0};
  tDecoderInit(&dc, pData, nData);
  metaDecodeEntry(&dc, &me);
  if (me.type == TSDB_SUPER_TABLE) {
    if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
H
Hongze Cheng 已提交
290
      pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
H
Hongze Cheng 已提交
291 292
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
293
    }
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    { // Traverse to find the previous qualified data
      TBC      *pCur;
      tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
      STbDbKey key = {.version = sver, .uid = INT64_MAX};
      int c = 0;
      tdbTbcMoveTo(pCur, &key, sizeof(key), &c);
      if(c < 0){
        tdbTbcMoveToPrev(pCur);
      }

      void *pKey = NULL;
      void *pVal = NULL;
      int   vLen = 0, kLen = 0;
      while(1){
        int32_t ret = tdbTbcPrev(pCur, &pKey, &kLen, &pVal, &vLen);
        if (ret < 0) break;

        STbDbKey *tmp = (STbDbKey*)pKey;
        if(tmp->uid != uid){
          continue;
        }
        SDecoder   dcNew = {0};
        SMetaEntry meNew = {0};
        tDecoderInit(&dcNew, pVal, vLen);
        metaDecodeEntry(&dcNew, &meNew);
        pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow);
        tDecoderClear(&dcNew);
        tdbTbcClose(pCur);
322 323
        tdbFree(pKey);
        tdbFree(pVal);
324 325
        goto _exit;
      }
326 327
      tdbFree(pKey);
      tdbFree(pVal);
328 329
      tdbTbcClose(pCur);
    }
H
Hongze Cheng 已提交
330 331
  } else if (me.type == TSDB_CHILD_TABLE) {
    uid = me.ctbEntry.suid;
H
Hongze Cheng 已提交
332
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
333
    goto _query;
H
Hongze Cheng 已提交
334
  } else {
H
Hongze Cheng 已提交
335 336 337 338
    if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
      pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
339
    }
H
Hongze Cheng 已提交
340 341
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
342

H
Hongze Cheng 已提交
343 344 345
  // query from skm db
  if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
    goto _err;
H
Hongze Cheng 已提交
346
  }
H
Hongze Cheng 已提交
347

H
Hongze Cheng 已提交
348
  tDecoderInit(&dc, pData, nData);
M
Minglei Jin 已提交
349
  tDecodeSSchemaWrapperEx(&dc, &schema);
H
Hongze Cheng 已提交
350 351 352 353
  pSchema = tCloneSSchemaWrapper(&schema);
  tDecoderClear(&dc);

_exit:
354
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
355 356 357
  metaULock(pMeta);
  tdbFree(pData);
  return pSchema;
H
Hongze Cheng 已提交
358

H
Hongze Cheng 已提交
359
_err:
360
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
361 362 363
  metaULock(pMeta);
  tdbFree(pData);
  return NULL;
H
Hongze Cheng 已提交
364 365
}

H
Hongze Cheng 已提交
366 367 368
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
  TBC *pCur;
  int  ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
369 370 371 372 373 374 375 376 377 378 379 380 381 382
  if (ret < 0) {
    return ret;
  }

  STtlIdxKey ttlKey = {0};
  ttlKey.dtime = ttl;
  ttlKey.uid = INT64_MAX;
  int c = 0;
  tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
  if (c < 0) {
    tdbTbcMoveToPrev(pCur);
  }

  void *pKey = NULL;
H
Hongze Cheng 已提交
383 384
  int   kLen = 0;
  while (1) {
385 386 387 388
    ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
    if (ret < 0) {
      break;
    }
H
Hongze Cheng 已提交
389
    ttlKey = *(STtlIdxKey *)pKey;
390
    taosArrayPush(uidList, &ttlKey.uid);
391
    tdbFree(pKey);
392 393 394 395 396
  }
  tdbTbcClose(pCur);
  return 0;
}

H
Hongze Cheng 已提交
397
struct SMCtbCursor {
H
Hongze Cheng 已提交
398 399
  SMeta   *pMeta;
  TBC     *pCur;
H
Hongze Cheng 已提交
400
  tb_uid_t suid;
H
Hongze Cheng 已提交
401 402
  void    *pKey;
  void    *pVal;
H
Hongze Cheng 已提交
403 404 405 406
  int      kLen;
  int      vLen;
};

H
Hongze Cheng 已提交
407 408
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
  SMCtbCursor *pCtbCur = NULL;
H
Hongze Cheng 已提交
409
  SCtbIdxKey   ctbIdxKey;
C
Cary Xu 已提交
410 411
  int          ret = 0;
  int          c = 0;
H
Hongze Cheng 已提交
412

H
Hongze Cheng 已提交
413 414 415 416
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
  if (pCtbCur == NULL) {
    return NULL;
  }
H
Hongze Cheng 已提交
417

H
fix  
Hongze Cheng 已提交
418
  pCtbCur->pMeta = pMeta;
H
Hongze Cheng 已提交
419
  pCtbCur->suid = uid;
H
fix  
Hongze Cheng 已提交
420 421
  metaRLock(pMeta);

H
Hongze Cheng 已提交
422
  ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
H
Hongze Cheng 已提交
423
  if (ret < 0) {
H
fix  
Hongze Cheng 已提交
424
    metaULock(pMeta);
H
Hongze Cheng 已提交
425 426 427
    taosMemoryFree(pCtbCur);
    return NULL;
  }
H
Hongze Cheng 已提交
428

H
Hongze Cheng 已提交
429 430 431
  // move to the suid
  ctbIdxKey.suid = uid;
  ctbIdxKey.uid = INT64_MIN;
H
Hongze Cheng 已提交
432
  tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
H
Hongze Cheng 已提交
433
  if (c > 0) {
H
Hongze Cheng 已提交
434
    tdbTbcMoveToNext(pCtbCur->pCur);
H
Hongze Cheng 已提交
435 436
  }

H
Hongze Cheng 已提交
437 438 439
  return pCtbCur;
}

C
Cary Xu 已提交
440
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
441
  if (pCtbCur) {
H
fix  
Hongze Cheng 已提交
442
    if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
H
Hongze Cheng 已提交
443
    if (pCtbCur->pCur) {
H
Hongze Cheng 已提交
444
      tdbTbcClose(pCtbCur->pCur);
H
Hongze Cheng 已提交
445

H
Hongze Cheng 已提交
446 447
      tdbFree(pCtbCur->pKey);
      tdbFree(pCtbCur->pVal);
H
Hongze Cheng 已提交
448
    }
H
Hongze Cheng 已提交
449

H
Hongze Cheng 已提交
450 451
    taosMemoryFree(pCtbCur);
  }
H
Hongze Cheng 已提交
452 453 454
}

tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
455 456
  int         ret;
  SCtbIdxKey *pCtbIdxKey;
H
Hongze Cheng 已提交
457

H
Hongze Cheng 已提交
458
  ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
H
Hongze Cheng 已提交
459 460 461
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
462

H
Hongze Cheng 已提交
463
  pCtbIdxKey = pCtbCur->pKey;
H
Hongze Cheng 已提交
464 465 466
  if (pCtbIdxKey->suid > pCtbCur->suid) {
    return 0;
  }
H
Hongze Cheng 已提交
467

H
Hongze Cheng 已提交
468
  return pCtbIdxKey->uid;
H
Hongze Cheng 已提交
469 470
}

C
Cary Xu 已提交
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
struct SMStbCursor {
  SMeta   *pMeta;
  TBC     *pCur;
  tb_uid_t suid;
  void    *pKey;
  void    *pVal;
  int      kLen;
  int      vLen;
};

SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
  SMStbCursor *pStbCur = NULL;
  int          ret = 0;
  int          c = 0;

  pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
  if (pStbCur == NULL) {
C
Cary Xu 已提交
488
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
489 490 491 492 493 494 495 496 497
    return NULL;
  }

  pStbCur->pMeta = pMeta;
  pStbCur->suid = suid;
  metaRLock(pMeta);

  ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
  if (ret < 0) {
C
Cary Xu 已提交
498
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
    metaULock(pMeta);
    taosMemoryFree(pStbCur);
    return NULL;
  }

  // move to the suid
  tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
  if (c > 0) {
    tdbTbcMoveToNext(pStbCur->pCur);
  }

  return pStbCur;
}

void metaCloseStbCursor(SMStbCursor *pStbCur) {
  if (pStbCur) {
    if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
    if (pStbCur->pCur) {
      tdbTbcClose(pStbCur->pCur);

      tdbFree(pStbCur->pKey);
      tdbFree(pStbCur->pVal);
    }

    taosMemoryFree(pStbCur);
  }
}

tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
  int ret;

  ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
534
  return *(tb_uid_t *)pStbCur->pKey;
C
Cary Xu 已提交
535 536
}

H
Hongze Cheng 已提交
537
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
H
Hongze Cheng 已提交
538
  // SMetaReader     mr = {0};
H
Hongze Cheng 已提交
539
  STSchema       *pTSchema = NULL;
H
Hongze Cheng 已提交
540 541
  SSchemaWrapper *pSW = NULL;
  STSchemaBuilder sb = {0};
H
Hongze Cheng 已提交
542
  SSchema        *pSchema;
H
Hongze Cheng 已提交
543

H
Hongze Cheng 已提交
544
  pSW = metaGetTableSchema(pMeta, uid, sver, 0);
C
Cary Xu 已提交
545
  if (!pSW) return NULL;
H
fix  
Hongze Cheng 已提交
546

547
  tdInitTSchemaBuilder(&sb, pSW->version);
H
Hongze Cheng 已提交
548 549 550 551 552
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
C
Cary Xu 已提交
553

H
Hongze Cheng 已提交
554 555
  tdDestroyTSchemaBuilder(&sb);

H
Hongze Cheng 已提交
556 557
  taosMemoryFree(pSW->pSchema);
  taosMemoryFree(pSW);
H
Hongze Cheng 已提交
558 559 560
  return pTSchema;
}

H
Hongze Cheng 已提交
561
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
H
Hongze Cheng 已提交
562 563
  int32_t code = 0;

H
Hongze Cheng 已提交
564 565
  void     *pData = NULL;
  int       nData = 0;
H
Hongze Cheng 已提交
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
  SSkmDbKey skmDbKey;
  if (sver <= 0) {
    SMetaInfo info;
    if (metaGetInfo(pMeta, suid ? suid : uid, &info) == 0) {
      sver = info.skmVer;
    } else {
      TBC *pSkmDbC = NULL;
      int  c;

      skmDbKey.uid = suid ? suid : uid;
      skmDbKey.sver = INT32_MAX;

      tdbTbcOpen(pMeta->pSkmDb, &pSkmDbC, NULL);
      metaRLock(pMeta);

      if (tdbTbcMoveTo(pSkmDbC, &skmDbKey, sizeof(skmDbKey), &c) < 0) {
        metaULock(pMeta);
        tdbTbcClose(pSkmDbC);
        code = TSDB_CODE_NOT_FOUND;
        goto _exit;
      }

      ASSERT(c);

      if (c < 0) {
        tdbTbcMoveToPrev(pSkmDbC);
      }

      const void *pKey = NULL;
      int32_t     nKey = 0;
      tdbTbcGet(pSkmDbC, &pKey, &nKey, NULL, NULL);

H
Hongze Cheng 已提交
598
      if (((SSkmDbKey *)pKey)->uid != skmDbKey.uid) {
H
Hongze Cheng 已提交
599 600 601 602 603 604 605 606 607 608 609 610
        metaULock(pMeta);
        tdbTbcClose(pSkmDbC);
        code = TSDB_CODE_NOT_FOUND;
        goto _exit;
      }

      sver = ((SSkmDbKey *)pKey)->sver;

      metaULock(pMeta);
      tdbTbcClose(pSkmDbC);
    }
  }
H
Hongze Cheng 已提交
611

H
Hongze Cheng 已提交
612 613 614 615
  ASSERT(sver > 0);

  skmDbKey.uid = suid ? suid : uid;
  skmDbKey.sver = sver;
H
Hongze Cheng 已提交
616
  metaRLock(pMeta);
H
Hongze Cheng 已提交
617
  if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(SSkmDbKey), &pData, &nData) < 0) {
H
Hongze Cheng 已提交
618
    metaULock(pMeta);
H
Hongze Cheng 已提交
619 620
    code = TSDB_CODE_NOT_FOUND;
    goto _exit;
H
Hongze Cheng 已提交
621 622 623 624 625 626 627 628 629 630 631
  }
  metaULock(pMeta);

  // decode
  SDecoder        dc = {0};
  SSchemaWrapper  schema;
  SSchemaWrapper *pSchemaWrapper = &schema;

  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
  tDecoderClear(&dc);
632
  tdbFree(pData);
H
Hongze Cheng 已提交
633 634 635 636 637 638 639 640 641

  // convert
  STSchemaBuilder sb = {0};

  tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
  for (int i = 0; i < pSchemaWrapper->nCols; i++) {
    SSchema *pSchema = pSchemaWrapper->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
H
Hongze Cheng 已提交
642
  STSchema *pTSchema = tdGetSchemaFromBuilder(&sb);
H
Hongze Cheng 已提交
643 644 645 646 647
  tdDestroyTSchemaBuilder(&sb);

  *ppTSchema = pTSchema;
  taosMemoryFree(pSchemaWrapper->pSchema);

H
Hongze Cheng 已提交
648
_exit:
H
Hongze Cheng 已提交
649 650 651
  return code;
}

652 653
// N.B. Called by statusReq per second
int64_t metaGetTbNum(SMeta *pMeta) {
654 655 656 657 658
  // num of child tables (excluding normal tables , stables and others)

  /* int64_t num = 0; */
  /* vnodeGetAllCtbNum(pMeta->pVnode, &num); */

659
  return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
660 661 662
}

// N.B. Called by statusReq per second
M
Minglei Jin 已提交
663
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
664
  // sum of (number of columns of stable -  1) * number of ctables (excluding timestamp column)
665 666 667 668 669 670 671
  if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || ++pMeta->pVnode->config.vndStats.itvTimeSeries % 60 == 0) {
    int64_t num = 0;
    vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
    pMeta->pVnode->config.vndStats.numOfTimeSeries = num;

    pMeta->pVnode->config.vndStats.itvTimeSeries = 0;
  }
672

673
  return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
C
Cary Xu 已提交
674
}
H
Hongze Cheng 已提交
675

C
Cary Xu 已提交
676
typedef struct {
H
Hongze Cheng 已提交
677 678
  SMeta   *pMeta;
  TBC     *pCur;
C
Cary Xu 已提交
679
  tb_uid_t uid;
H
Hongze Cheng 已提交
680 681
  void    *pKey;
  void    *pVal;
C
Cary Xu 已提交
682 683 684 685 686 687 688 689 690 691 692 693 694
  int      kLen;
  int      vLen;
} SMSmaCursor;

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pSmaCur = NULL;
  SSmaIdxKey   smaIdxKey;
  int          ret;
  int          c;

  pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
  if (pSmaCur == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
695 696 697
    return NULL;
  }

C
Cary Xu 已提交
698 699 700 701
  pSmaCur->pMeta = pMeta;
  pSmaCur->uid = uid;
  metaRLock(pMeta);

H
Hongze Cheng 已提交
702
  ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
C
Cary Xu 已提交
703 704 705
  if (ret < 0) {
    metaULock(pMeta);
    taosMemoryFree(pSmaCur);
H
Hongze Cheng 已提交
706 707 708
    return NULL;
  }

C
Cary Xu 已提交
709 710 711
  // move to the suid
  smaIdxKey.uid = uid;
  smaIdxKey.smaUid = INT64_MIN;
H
Hongze Cheng 已提交
712
  tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
C
Cary Xu 已提交
713
  if (c > 0) {
H
Hongze Cheng 已提交
714
    tdbTbcMoveToNext(pSmaCur->pCur);
C
Cary Xu 已提交
715
  }
H
Hongze Cheng 已提交
716

C
Cary Xu 已提交
717 718
  return pSmaCur;
}
H
Hongze Cheng 已提交
719

C
Cary Xu 已提交
720 721 722 723
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
  if (pSmaCur) {
    if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
    if (pSmaCur->pCur) {
H
Hongze Cheng 已提交
724
      tdbTbcClose(pSmaCur->pCur);
H
Hongze Cheng 已提交
725

C
Cary Xu 已提交
726 727 728
      tdbFree(pSmaCur->pKey);
      tdbFree(pSmaCur->pVal);
    }
H
Hongze Cheng 已提交
729

C
Cary Xu 已提交
730 731 732 733 734 735 736 737
    taosMemoryFree(pSmaCur);
  }
}

tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
  int         ret;
  SSmaIdxKey *pSmaIdxKey;

H
Hongze Cheng 已提交
738
  ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
C
Cary Xu 已提交
739 740 741 742 743 744 745 746 747 748 749 750 751 752
  if (ret < 0) {
    return 0;
  }

  pSmaIdxKey = pSmaCur->pKey;
  if (pSmaIdxKey->uid > pSmaCur->uid) {
    return 0;
  }

  return pSmaIdxKey->uid;
}

STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
  STSmaWrapper *pSW = NULL;
H
Hongze Cheng 已提交
753
  SArray       *pSmaIds = NULL;
C
Cary Xu 已提交
754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776

  if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
    return NULL;
  }

  pSW = taosMemoryCalloc(1, sizeof(*pSW));
  if (!pSW) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pSW->number = taosArrayGetSize(pSmaIds);
  pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));

  if (!pSW->tSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  int64_t smaId;
  int     smaIdx = 0;
H
Hongze Cheng 已提交
777
  STSma  *pTSma = NULL;
C
Cary Xu 已提交
778 779 780
  for (int i = 0; i < pSW->number; ++i) {
    smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
    if (metaGetTableEntryByUid(&mr, smaId) < 0) {
781
      tDecoderClear(&mr.coder);
S
Shengliang Guan 已提交
782
      metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
C
Cary Xu 已提交
783 784
      continue;
    }
785
    tDecoderClear(&mr.coder);
C
Cary Xu 已提交
786 787 788 789 790 791 792 793
    pTSma = pSW->tSma + smaIdx;
    memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
    if (deepCopy) {
      if (pTSma->exprLen > 0) {
        if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
794
        memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
H
Hongze Cheng 已提交
795
      }
C
Cary Xu 已提交
796 797 798 799 800
      if (pTSma->tagsFilterLen > 0) {
        if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
801
      }
H
Hongze Cheng 已提交
802
      memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
C
Cary Xu 已提交
803 804 805 806 807
    } else {
      pTSma->exprLen = 0;
      pTSma->expr = NULL;
      pTSma->tagsFilterLen = 0;
      pTSma->tagsFilter = NULL;
H
Hongze Cheng 已提交
808
    }
H
Hongze Cheng 已提交
809

C
Cary Xu 已提交
810
    ++smaIdx;
H
Hongze Cheng 已提交
811 812
  }

C
Cary Xu 已提交
813 814
  if (smaIdx <= 0) goto _err;
  pSW->number = smaIdx;
H
Hongze Cheng 已提交
815

C
Cary Xu 已提交
816 817
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
H
Hongze Cheng 已提交
818
  return pSW;
C
Cary Xu 已提交
819 820 821
_err:
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
C
Cary Xu 已提交
822
  tFreeTSmaWrapper(pSW, deepCopy);
H
Hongze Cheng 已提交
823 824 825
  return NULL;
}

C
Cary Xu 已提交
826
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
H
Hongze Cheng 已提交
827
  STSma      *pTSma = NULL;
C
Cary Xu 已提交
828 829 830
  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
S
Shengliang Guan 已提交
831
    metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
C
Cary Xu 已提交
832 833 834 835 836 837 838 839 840 841 842 843 844 845
    metaReaderClear(&mr);
    return NULL;
  }
  pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
  if (!pTSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    metaReaderClear(&mr);
    return NULL;
  }

  memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));

  metaReaderClear(&mr);
  return pTSma;
H
Hongze Cheng 已提交
846 847
}

C
Cary Xu 已提交
848
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
H
Hongze Cheng 已提交
849
  SArray     *pUids = NULL;
C
Cary Xu 已提交
850
  SSmaIdxKey *pSmaIdxKey = NULL;
H
Hongze Cheng 已提交
851

C
Cary Xu 已提交
852 853
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (!pCur) {
H
Hongze Cheng 已提交
854 855 856
    return NULL;
  }

C
Cary Xu 已提交
857 858 859 860 861
  while (1) {
    tb_uid_t id = metaSmaCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Hongze Cheng 已提交
862

C
Cary Xu 已提交
863 864
    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
H
Hongze Cheng 已提交
865
      if (!pUids) {
C
Cary Xu 已提交
866 867 868
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
H
Hongze Cheng 已提交
869
      }
C
Cary Xu 已提交
870
    }
H
Hongze Cheng 已提交
871

C
Cary Xu 已提交
872
    pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
H
Hongze Cheng 已提交
873

C
Cary Xu 已提交
874 875 876 877 878
    if (taosArrayPush(pUids, &pSmaIdxKey->smaUid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
H
Hongze Cheng 已提交
879 880 881 882 883 884 885
    }
  }

  metaCloseSmaCursor(pCur);
  return pUids;
}

C
Cary Xu 已提交
886
SArray *metaGetSmaTbUids(SMeta *pMeta) {
H
Hongze Cheng 已提交
887
  SArray     *pUids = NULL;
C
Cary Xu 已提交
888 889 890 891 892
  SSmaIdxKey *pSmaIdxKey = NULL;
  tb_uid_t    lastUid = 0;

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
  if (!pCur) {
H
Hongze Cheng 已提交
893 894 895
    return NULL;
  }

C
Cary Xu 已提交
896 897 898 899 900
  while (1) {
    tb_uid_t uid = metaSmaCursorNext(pCur);
    if (uid == 0) {
      break;
    }
H
Hongze Cheng 已提交
901

C
Cary Xu 已提交
902 903 904
    if (lastUid == uid) {
      continue;
    }
H
Hongze Cheng 已提交
905

C
Cary Xu 已提交
906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922
    lastUid = uid;

    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
      if (!pUids) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
      }
    }

    if (taosArrayPush(pUids, &uid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
    }
H
Hongze Cheng 已提交
923 924
  }

C
Cary Xu 已提交
925 926
  metaCloseSmaCursor(pCur);
  return pUids;
H
Hongze Cheng 已提交
927 928
}

L
Liu Jicong 已提交
929
#endif
H
Hongze Cheng 已提交
930

931
const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
dengyihao's avatar
dengyihao 已提交
932
  STag *tag = (STag *)pTag;
dengyihao's avatar
dengyihao 已提交
933
  if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
934 935
    return tag;
  }
wmmhello's avatar
wmmhello 已提交
936 937
  bool find = tTagGet(tag, val);

dengyihao's avatar
dengyihao 已提交
938
  if (!find) {
wmmhello's avatar
wmmhello 已提交
939 940
    return NULL;
  }
wmmhello's avatar
wmmhello 已提交
941

wmmhello's avatar
wmmhello 已提交
942
#ifdef TAG_FILTER_DEBUG
wmmhello's avatar
wmmhello 已提交
943
  if (IS_VAR_DATA_TYPE(val->type)) {
944
    char *buf = taosMemoryCalloc(val->nData + 1, 1);
wmmhello's avatar
wmmhello 已提交
945 946 947 948 949 950 951 952 953
    memcpy(buf, val->pData, val->nData);
    metaDebug("metaTag table val varchar index:%d cid:%d type:%d value:%s", 1, val->cid, val->type, buf);
    taosMemoryFree(buf);
  } else {
    double dval = 0;
    GET_TYPED_DATA(dval, double, val->type, &val->i64);
    metaDebug("metaTag table val number index:%d cid:%d type:%d value:%f", 1, val->cid, val->type, dval);
  }

954 955
  SArray *pTagVals = NULL;
  tTagToValArray((STag *)pTag, &pTagVals);
wmmhello's avatar
wmmhello 已提交
956
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
957
    STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
wmmhello's avatar
wmmhello 已提交
958 959

    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
960
      char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
wmmhello's avatar
wmmhello 已提交
961 962 963 964 965 966 967 968 969
      memcpy(buf, pTagVal->pData, pTagVal->nData);
      metaDebug("metaTag table varchar index:%d cid:%d type:%d value:%s", i, pTagVal->cid, pTagVal->type, buf);
      taosMemoryFree(buf);
    } else {
      double dval = 0;
      GET_TYPED_DATA(dval, double, pTagVal->type, &pTagVal->i64);
      metaDebug("metaTag table number index:%d cid:%d type:%d value:%f", i, pTagVal->cid, pTagVal->type, dval);
    }
  }
wmmhello's avatar
wmmhello 已提交
970
#endif
wmmhello's avatar
wmmhello 已提交
971

wmmhello's avatar
wmmhello 已提交
972
  return val;
dengyihao's avatar
dengyihao 已提交
973
}
wmmhello's avatar
wmmhello 已提交
974 975

typedef struct {
H
Hongze Cheng 已提交
976 977
  SMeta   *pMeta;
  TBC     *pCur;
wmmhello's avatar
wmmhello 已提交
978 979 980
  tb_uid_t suid;
  int16_t  cid;
  int16_t  type;
H
Hongze Cheng 已提交
981 982
  void    *pKey;
  void    *pVal;
wmmhello's avatar
wmmhello 已提交
983 984 985 986
  int32_t  kLen;
  int32_t  vLen;
} SIdxCursor;

dengyihao's avatar
dengyihao 已提交
987 988 989
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
  int32_t ret = 0;
  char   *buf = NULL;
wmmhello's avatar
wmmhello 已提交
990

dengyihao's avatar
dengyihao 已提交
991 992 993 994
  STagIdxKey *pKey = NULL;
  int32_t     nKey = 0;

  SIdxCursor *pCursor = NULL;
wmmhello's avatar
wmmhello 已提交
995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
  pCursor->pMeta = pMeta;
  pCursor->suid = param->suid;
  pCursor->cid = param->cid;
  pCursor->type = param->type;

  metaRLock(pMeta);
  ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
  if (ret < 0) {
    goto END;
  }

dengyihao's avatar
dengyihao 已提交
1007
  int32_t maxSize = 0;
wmmhello's avatar
wmmhello 已提交
1008
  int32_t nTagData = 0;
H
Hongze Cheng 已提交
1009
  void   *tagData = NULL;
dengyihao's avatar
dengyihao 已提交
1010

dengyihao's avatar
dengyihao 已提交
1011
  if (param->val == NULL) {
S
Shengliang Guan 已提交
1012
    metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
dengyihao's avatar
dengyihao 已提交
1013
    return -1;
dengyihao's avatar
dengyihao 已提交
1014
  } else {
dengyihao's avatar
dengyihao 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
    if (IS_VAR_DATA_TYPE(param->type)) {
      tagData = varDataVal(param->val);
      nTagData = varDataLen(param->val);

      if (param->type == TSDB_DATA_TYPE_NCHAR) {
        maxSize = 4 * nTagData + 1;
        buf = taosMemoryCalloc(1, maxSize);
        if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
          goto END;
        }
dengyihao's avatar
dengyihao 已提交
1025

dengyihao's avatar
dengyihao 已提交
1026 1027 1028 1029 1030 1031
        tagData = buf;
        nTagData = maxSize;
      }
    } else {
      tagData = param->val;
      nTagData = tDataTypes[param->type].bytes;
dengyihao's avatar
dengyihao 已提交
1032
    }
wmmhello's avatar
wmmhello 已提交
1033
  }
dengyihao's avatar
dengyihao 已提交
1034
  ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
wmmhello's avatar
wmmhello 已提交
1035
                            param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
dengyihao's avatar
dengyihao 已提交
1036

wmmhello's avatar
wmmhello 已提交
1037 1038 1039 1040 1041 1042 1043
  if (ret != 0) {
    goto END;
  }
  int cmp = 0;
  if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
1044

dengyihao's avatar
dengyihao 已提交
1045
  bool    first = true;
dengyihao's avatar
dengyihao 已提交
1046
  int32_t valid = 0;
wmmhello's avatar
wmmhello 已提交
1047
  while (1) {
dengyihao's avatar
dengyihao 已提交
1048 1049 1050
    void   *entryKey = NULL, *entryVal = NULL;
    int32_t nEntryKey, nEntryVal;

wmmhello's avatar
wmmhello 已提交
1051 1052 1053 1054 1055
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
    if (valid < 0) {
      break;
    }
    STagIdxKey *p = entryKey;
dengyihao's avatar
dengyihao 已提交
1056 1057 1058 1059 1060 1061 1062 1063 1064
    if (p->type != pCursor->type) {
      if (first) {
        valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
        if (valid < 0) break;
        continue;
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
1065 1066 1067
    if (p->suid != pKey->suid) {
      break;
    }
dengyihao's avatar
dengyihao 已提交
1068
    first = false;
wmmhello's avatar
wmmhello 已提交
1069 1070 1071 1072
    if (p != NULL) {
      int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
      if (cmp == 0) {
        // match
dengyihao's avatar
dengyihao 已提交
1073 1074 1075 1076 1077 1078
        tb_uid_t tuid = 0;
        if (IS_VAR_DATA_TYPE(pKey->type)) {
          tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
        } else {
          tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
        }
wmmhello's avatar
wmmhello 已提交
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091
        taosArrayPush(pUids, &tuid);
      } else if (cmp == 1) {
        // not match but should continue to iter
      } else {
        // not match and no more result
        break;
      }
    }
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
    if (valid < 0) {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
1092

wmmhello's avatar
wmmhello 已提交
1093 1094 1095
END:
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
dengyihao's avatar
dengyihao 已提交
1096
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
1097
  taosMemoryFree(pKey);
wmmhello's avatar
wmmhello 已提交
1098 1099 1100 1101

  taosMemoryFree(pCursor);

  return ret;
dengyihao's avatar
dengyihao 已提交
1102
}
H
Hongze Cheng 已提交
1103

1104
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
1105 1106
  SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid);

1107
  SHashObj *uHash = NULL;
dengyihao's avatar
dengyihao 已提交
1108 1109
  size_t    len = taosArrayGetSize(uidList);  // len > 0 means there already have uids
  if (len > 0) {
1110
    uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1111
    for (int i = 0; i < len; i++) {
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
      int64_t *uid = taosArrayGet(uidList, i);
      taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
    }
  }
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }

wmmhello's avatar
wmmhello 已提交
1122
    if (len > 0 && taosHashGet(uHash, &id, sizeof(int64_t)) == NULL) {
1123
      continue;
dengyihao's avatar
dengyihao 已提交
1124
    } else if (len == 0) {
1125 1126 1127
      taosArrayPush(uidList, &id);
    }

1128
    taosHashPut(tags, &id, sizeof(int64_t), pCur->pVal, pCur->vLen);
1129 1130
  }

1131 1132 1133 1134
  taosHashCleanup(uHash);
  metaCloseCtbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1135

H
Hongze Cheng 已提交
1136 1137
int32_t metaCacheGet(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo);

H
Hongze Cheng 已提交
1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
  int32_t code = 0;
  void   *pData = NULL;
  int     nData = 0;

  metaRLock(pMeta);

  // search cache
  if (metaCacheGet(pMeta, uid, pInfo) == 0) {
    metaULock(pMeta);
    goto _exit;
  }

  // search TDB
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    // not found
    metaULock(pMeta);
    code = TSDB_CODE_NOT_FOUND;
    goto _exit;
  }

  metaULock(pMeta);

  pInfo->uid = uid;
  pInfo->suid = ((SUidIdxVal *)pData)->suid;
  pInfo->version = ((SUidIdxVal *)pData)->version;
  pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;

  // upsert the cache
  metaWLock(pMeta);
  metaCacheUpsert(pMeta, pInfo);
  metaULock(pMeta);

_exit:
  tdbFree(pData);
  return code;
}