metaQuery.c 27.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "meta.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
H
Hongze Cheng 已提交
19 20
  memset(pReader, 0, sizeof(*pReader));
  pReader->flags = flags;
H
Hongze Cheng 已提交
21
  pReader->pMeta = pMeta;
H
Hongze Cheng 已提交
22
  metaRLock(pMeta);
H
Hongze Cheng 已提交
23
}
H
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25
void metaReaderClear(SMetaReader *pReader) {
H
Hongze Cheng 已提交
26 27 28
  if (pReader->pMeta) {
    metaULock(pReader->pMeta);
  }
H
Hongze Cheng 已提交
29
  tDecoderClear(&pReader->coder);
H
Hongze Cheng 已提交
30
  tdbFree(pReader->pBuf);
H
Hongze Cheng 已提交
31 32
}

H
Hongze Cheng 已提交
33
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
H
Hongze Cheng 已提交
34
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
35 36
  STbDbKey tbDbKey = {.version = version, .uid = uid};

H
Hongze Cheng 已提交
37
  // query table.db
H
Hongze Cheng 已提交
38
  if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
39
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
40 41 42 43
    goto _err;
  }

  // decode the entry
H
Hongze Cheng 已提交
44
  tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
H
Hongze Cheng 已提交
45 46 47 48 49 50 51 52 53 54 55

  if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
    goto _err;
  }

  return 0;

_err:
  return -1;
}

dengyihao's avatar
dengyihao 已提交
56
// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) {
wmmhello's avatar
wmmhello 已提交
57
//
dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
//   SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader));
//   SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey));
//   SMeta  *pMeta = meta;
//   int64_t version;
//   SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
63
//
dengyihao's avatar
dengyihao 已提交
64 65 66 67 68 69 70 71 72 73
//   int64_t stt1 = taosGetTimestampUs();
//   for(int i = 0; i < taosArrayGetSize(uidList); i++) {
//     void* ppVal = NULL;
//     int vlen = 0;
//     uint64_t *  uid = taosArrayGet(uidList, i);
//     // query uid.idx
//     if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) {
//       continue;
//     }
//     version = *(int64_t *)ppVal;
wmmhello's avatar
wmmhello 已提交
74
//
dengyihao's avatar
dengyihao 已提交
75 76 77 78 79 80
//     STbDbKey tbDbKey = {.version = version, .uid = *uid};
//     taosArrayPush(uidVersion, &tbDbKey);
//     taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t));
//   }
//   int64_t stt2 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1);
wmmhello's avatar
wmmhello 已提交
81
//
dengyihao's avatar
dengyihao 已提交
82 83 84 85 86
//   TBC        *pCur = NULL;
//   tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
//   tdbTbcMoveToFirst(pCur);
//   void *pKey = NULL;
//   int   kLen = 0;
wmmhello's avatar
wmmhello 已提交
87
//
dengyihao's avatar
dengyihao 已提交
88 89 90 91 92 93 94 95 96 97
//   while(1){
//     SMetaReader pReader = {0};
//     int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf);
//     if (ret < 0) break;
//     STbDbKey *tmp = (STbDbKey*)pKey;
//     int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t));
//     if(ver == NULL || *ver != tmp->version) continue;
//     taosArrayPush(readerList, &pReader);
//   }
//   tdbTbcClose(pCur);
wmmhello's avatar
wmmhello 已提交
98
//
dengyihao's avatar
dengyihao 已提交
99 100 101 102 103
//   taosArrayClear(readerList);
//   int64_t stt3 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2);
//   for(int i = 0; i < taosArrayGetSize(uidVersion); i++) {
//     SMetaReader pReader = {0};
wmmhello's avatar
wmmhello 已提交
104
//
dengyihao's avatar
dengyihao 已提交
105 106 107 108 109 110 111 112 113
//     STbDbKey *tbDbKey = taosArrayGet(uidVersion, i);
//     // query table.db
//     if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) {
//       continue;
//     }
//     taosArrayPush(readerList, &pReader);
//   }
//   int64_t stt4 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3);
wmmhello's avatar
wmmhello 已提交
114
//
dengyihao's avatar
dengyihao 已提交
115 116 117 118 119
//   for(int i = 0; i < taosArrayGetSize(readerList); i++){
//     SMetaReader* pReader  = taosArrayGet(readerList, i);
//     metaReaderInit(pReader, meta, 0);
//     // decode the entry
//     tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
wmmhello's avatar
wmmhello 已提交
120
//
dengyihao's avatar
dengyihao 已提交
121 122 123 124 125 126 127 128
//     if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
//     }
//     metaReaderClear(pReader);
//   }
//   int64_t stt5 = taosGetTimestampUs();
//   qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4);
//   return 0;
// }
wmmhello's avatar
wmmhello 已提交
129

H
Hongze Cheng 已提交
130
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
131 132 133 134 135 136 137 138
  // query uid.idx
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
    return false;
  }

  return true;
}

H
Hongze Cheng 已提交
139
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
H
Hongze Cheng 已提交
140
  SMeta  *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
141 142 143
  int64_t version;

  // query uid.idx
H
Hongze Cheng 已提交
144
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
145
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
146 147 148
    return -1;
  }

H
Hongze Cheng 已提交
149
  version = ((SUidIdxVal *)pReader->pBuf)[0].version;
H
Hongze Cheng 已提交
150
  return metaGetTableEntryByVersion(pReader, version, uid);
H
Hongze Cheng 已提交
151 152
}

H
Hongze Cheng 已提交
153
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
H
Hongze Cheng 已提交
154
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
155 156 157
  tb_uid_t uid;

  // query name.idx
H
Hongze Cheng 已提交
158
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
159
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
160 161 162 163
    return -1;
  }

  uid = *(tb_uid_t *)pReader->pBuf;
H
Hongze Cheng 已提交
164
  return metaGetTableEntryByUid(pReader, uid);
H
Hongze Cheng 已提交
165 166
}

H
Hongze Cheng 已提交
167
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
H
Hongze Cheng 已提交
168
  void    *pData = NULL;
H
Hongze Cheng 已提交
169 170 171
  int      nData = 0;
  tb_uid_t uid = 0;

wmmhello's avatar
wmmhello 已提交
172 173
  metaRLock(pMeta);

H
Hongze Cheng 已提交
174 175 176 177 178
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
    uid = *(tb_uid_t *)pData;
    tdbFree(pData);
  }

wmmhello's avatar
wmmhello 已提交
179 180
  metaULock(pMeta);

181
  return uid;
H
Hongze Cheng 已提交
182 183
}

dengyihao's avatar
dengyihao 已提交
184
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
D
dapan1121 已提交
185
  SMetaReader mr = {0};
dengyihao's avatar
dengyihao 已提交
186
  metaReaderInit(&mr, (SMeta *)meta, 0);
D
dapan1121 已提交
187 188 189 190 191 192 193 194
  metaGetTableEntryByUid(&mr, uid);

  STR_TO_VARSTR(tbName, mr.me.name);
  metaReaderClear(&mr);

  return 0;
}

H
Hongze Cheng 已提交
195
int metaReadNext(SMetaReader *pReader) {
H
Hongze Cheng 已提交
196 197
  SMeta *pMeta = pReader->pMeta;

H
Hongze Cheng 已提交
198
  // TODO
H
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200 201 202 203
  return 0;
}

#if 1  // ===================================================
H
Hongze Cheng 已提交
204 205 206 207 208 209 210 211
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;

  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
  if (pTbCur == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
212
  metaReaderInit(&pTbCur->mr, pMeta, 0);
H
Hongze Cheng 已提交
213

H
Hongze Cheng 已提交
214
  tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
H
Hongze Cheng 已提交
215

H
Hongze Cheng 已提交
216
  tdbTbcMoveToFirst(pTbCur->pDbc);
H
Hongze Cheng 已提交
217

H
Hongze Cheng 已提交
218 219 220 221 222
  return pTbCur;
}

void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
H
Hongze Cheng 已提交
223 224
    tdbFree(pTbCur->pKey);
    tdbFree(pTbCur->pVal);
H
Hongze Cheng 已提交
225
    metaReaderClear(&pTbCur->mr);
H
Hongze Cheng 已提交
226
    if (pTbCur->pDbc) {
H
Hongze Cheng 已提交
227
      tdbTbcClose(pTbCur->pDbc);
H
Hongze Cheng 已提交
228 229 230 231 232
    }
    taosMemoryFree(pTbCur);
  }
}

H
Hongze Cheng 已提交
233
int metaTbCursorNext(SMTbCursor *pTbCur) {
H
Hongze Cheng 已提交
234
  int    ret;
H
Hongze Cheng 已提交
235
  void  *pBuf;
H
Hongze Cheng 已提交
236 237 238
  STbCfg tbCfg;

  for (;;) {
H
Hongze Cheng 已提交
239
    ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
H
Hongze Cheng 已提交
240 241
    if (ret < 0) {
      return -1;
H
Hongze Cheng 已提交
242 243
    }

M
Minglei Jin 已提交
244 245
    tDecoderClear(&pTbCur->mr.coder);

H
Hongze Cheng 已提交
246
    metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
H
Hongze Cheng 已提交
247
    if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
248 249
      continue;
    }
H
Hongze Cheng 已提交
250 251

    break;
H
Hongze Cheng 已提交
252 253
  }

H
Hongze Cheng 已提交
254
  return 0;
H
Hongze Cheng 已提交
255 256
}

H
Hongze Cheng 已提交
257
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
H
Hongze Cheng 已提交
258
  void           *pData = NULL;
H
Hongze Cheng 已提交
259 260 261 262 263 264
  int             nData = 0;
  int64_t         version;
  SSchemaWrapper  schema = {0};
  SSchemaWrapper *pSchema = NULL;
  SDecoder        dc = {0};

H
fix  
Hongze Cheng 已提交
265
  metaRLock(pMeta);
H
Hongze Cheng 已提交
266 267 268 269
_query:
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    goto _err;
  }
H
Hongze Cheng 已提交
270

H
Hongze Cheng 已提交
271
  version = ((SUidIdxVal *)pData)[0].version;
H
Hongze Cheng 已提交
272

H
Hongze Cheng 已提交
273 274 275 276 277 278
  tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
  SMetaEntry me = {0};
  tDecoderInit(&dc, pData, nData);
  metaDecodeEntry(&dc, &me);
  if (me.type == TSDB_SUPER_TABLE) {
    if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
H
Hongze Cheng 已提交
279
      pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
H
Hongze Cheng 已提交
280 281
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
282
    }
H
Hongze Cheng 已提交
283 284
  } else if (me.type == TSDB_CHILD_TABLE) {
    uid = me.ctbEntry.suid;
H
Hongze Cheng 已提交
285
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
286
    goto _query;
H
Hongze Cheng 已提交
287
  } else {
H
Hongze Cheng 已提交
288 289 290 291
    if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
      pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
292
    }
H
Hongze Cheng 已提交
293 294
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296 297 298
  // query from skm db
  if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
    goto _err;
H
Hongze Cheng 已提交
299
  }
H
Hongze Cheng 已提交
300

H
Hongze Cheng 已提交
301
  tDecoderInit(&dc, pData, nData);
M
Minglei Jin 已提交
302
  tDecodeSSchemaWrapperEx(&dc, &schema);
H
Hongze Cheng 已提交
303 304 305 306
  pSchema = tCloneSSchemaWrapper(&schema);
  tDecoderClear(&dc);

_exit:
H
Hongze Cheng 已提交
307 308 309
  metaULock(pMeta);
  tdbFree(pData);
  return pSchema;
H
Hongze Cheng 已提交
310

H
Hongze Cheng 已提交
311 312 313 314
_err:
  metaULock(pMeta);
  tdbFree(pData);
  return NULL;
H
Hongze Cheng 已提交
315 316
}

H
Hongze Cheng 已提交
317 318 319
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
  TBC *pCur;
  int  ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
320 321 322 323 324 325 326 327 328 329 330 331 332 333
  if (ret < 0) {
    return ret;
  }

  STtlIdxKey ttlKey = {0};
  ttlKey.dtime = ttl;
  ttlKey.uid = INT64_MAX;
  int c = 0;
  tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
  if (c < 0) {
    tdbTbcMoveToPrev(pCur);
  }

  void *pKey = NULL;
H
Hongze Cheng 已提交
334 335
  int   kLen = 0;
  while (1) {
336 337 338 339
    ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
    if (ret < 0) {
      break;
    }
H
Hongze Cheng 已提交
340
    ttlKey = *(STtlIdxKey *)pKey;
341 342 343 344 345
    taosArrayPush(uidList, &ttlKey.uid);
  }
  tdbTbcClose(pCur);

  tdbFree(pKey);
wmmhello's avatar
wmmhello 已提交
346

347 348 349
  return 0;
}

H
Hongze Cheng 已提交
350
struct SMCtbCursor {
H
Hongze Cheng 已提交
351 352
  SMeta   *pMeta;
  TBC     *pCur;
H
Hongze Cheng 已提交
353
  tb_uid_t suid;
H
Hongze Cheng 已提交
354 355
  void    *pKey;
  void    *pVal;
H
Hongze Cheng 已提交
356 357 358 359
  int      kLen;
  int      vLen;
};

H
Hongze Cheng 已提交
360 361
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
  SMCtbCursor *pCtbCur = NULL;
H
Hongze Cheng 已提交
362
  SCtbIdxKey   ctbIdxKey;
C
Cary Xu 已提交
363 364
  int          ret = 0;
  int          c = 0;
H
Hongze Cheng 已提交
365

H
Hongze Cheng 已提交
366 367 368 369
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
  if (pCtbCur == NULL) {
    return NULL;
  }
H
Hongze Cheng 已提交
370

H
fix  
Hongze Cheng 已提交
371
  pCtbCur->pMeta = pMeta;
H
Hongze Cheng 已提交
372
  pCtbCur->suid = uid;
H
fix  
Hongze Cheng 已提交
373 374
  metaRLock(pMeta);

H
Hongze Cheng 已提交
375
  ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
H
Hongze Cheng 已提交
376
  if (ret < 0) {
H
fix  
Hongze Cheng 已提交
377
    metaULock(pMeta);
H
Hongze Cheng 已提交
378 379 380
    taosMemoryFree(pCtbCur);
    return NULL;
  }
H
Hongze Cheng 已提交
381

H
Hongze Cheng 已提交
382 383 384
  // move to the suid
  ctbIdxKey.suid = uid;
  ctbIdxKey.uid = INT64_MIN;
H
Hongze Cheng 已提交
385
  tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
H
Hongze Cheng 已提交
386
  if (c > 0) {
H
Hongze Cheng 已提交
387
    tdbTbcMoveToNext(pCtbCur->pCur);
H
Hongze Cheng 已提交
388 389
  }

H
Hongze Cheng 已提交
390 391 392
  return pCtbCur;
}

C
Cary Xu 已提交
393
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
394
  if (pCtbCur) {
H
fix  
Hongze Cheng 已提交
395
    if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
H
Hongze Cheng 已提交
396
    if (pCtbCur->pCur) {
H
Hongze Cheng 已提交
397
      tdbTbcClose(pCtbCur->pCur);
H
Hongze Cheng 已提交
398

H
Hongze Cheng 已提交
399 400
      tdbFree(pCtbCur->pKey);
      tdbFree(pCtbCur->pVal);
H
Hongze Cheng 已提交
401
    }
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403 404
    taosMemoryFree(pCtbCur);
  }
H
Hongze Cheng 已提交
405 406 407
}

tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
408 409
  int         ret;
  SCtbIdxKey *pCtbIdxKey;
H
Hongze Cheng 已提交
410

H
Hongze Cheng 已提交
411
  ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
H
Hongze Cheng 已提交
412 413 414
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
415

H
Hongze Cheng 已提交
416
  pCtbIdxKey = pCtbCur->pKey;
H
Hongze Cheng 已提交
417 418 419
  if (pCtbIdxKey->suid > pCtbCur->suid) {
    return 0;
  }
H
Hongze Cheng 已提交
420

H
Hongze Cheng 已提交
421
  return pCtbIdxKey->uid;
H
Hongze Cheng 已提交
422 423
}

C
Cary Xu 已提交
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
struct SMStbCursor {
  SMeta   *pMeta;
  TBC     *pCur;
  tb_uid_t suid;
  void    *pKey;
  void    *pVal;
  int      kLen;
  int      vLen;
};

SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
  SMStbCursor *pStbCur = NULL;
  int          ret = 0;
  int          c = 0;

  pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
  if (pStbCur == NULL) {
C
Cary Xu 已提交
441
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
442 443 444 445 446 447 448 449 450
    return NULL;
  }

  pStbCur->pMeta = pMeta;
  pStbCur->suid = suid;
  metaRLock(pMeta);

  ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
  if (ret < 0) {
C
Cary Xu 已提交
451
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
    metaULock(pMeta);
    taosMemoryFree(pStbCur);
    return NULL;
  }

  // move to the suid
  tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
  if (c > 0) {
    tdbTbcMoveToNext(pStbCur->pCur);
  }

  return pStbCur;
}

void metaCloseStbCursor(SMStbCursor *pStbCur) {
  if (pStbCur) {
    if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
    if (pStbCur->pCur) {
      tdbTbcClose(pStbCur->pCur);

      tdbFree(pStbCur->pKey);
      tdbFree(pStbCur->pVal);
    }

    taosMemoryFree(pStbCur);
  }
}

tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
  int ret;

  ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
487
  return *(tb_uid_t *)pStbCur->pKey;
C
Cary Xu 已提交
488 489
}

H
Hongze Cheng 已提交
490
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
H
Hongze Cheng 已提交
491
  // SMetaReader     mr = {0};
H
Hongze Cheng 已提交
492
  STSchema       *pTSchema = NULL;
H
Hongze Cheng 已提交
493 494
  SSchemaWrapper *pSW = NULL;
  STSchemaBuilder sb = {0};
H
Hongze Cheng 已提交
495
  SSchema        *pSchema;
H
Hongze Cheng 已提交
496

H
Hongze Cheng 已提交
497
  pSW = metaGetTableSchema(pMeta, uid, sver, 0);
C
Cary Xu 已提交
498
  if (!pSW) return NULL;
H
fix  
Hongze Cheng 已提交
499

500
  tdInitTSchemaBuilder(&sb, pSW->version);
H
Hongze Cheng 已提交
501 502 503 504 505
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
C
Cary Xu 已提交
506

H
Hongze Cheng 已提交
507 508
  tdDestroyTSchemaBuilder(&sb);

H
Hongze Cheng 已提交
509 510
  taosMemoryFree(pSW->pSchema);
  taosMemoryFree(pSW);
H
Hongze Cheng 已提交
511 512 513
  return pTSchema;
}

H
Hongze Cheng 已提交
514
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
H
Hongze Cheng 已提交
515 516
  int32_t code = 0;

H
Hongze Cheng 已提交
517 518
  void     *pData = NULL;
  int       nData = 0;
H
Hongze Cheng 已提交
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
  SSkmDbKey skmDbKey;
  if (sver <= 0) {
    SMetaInfo info;
    if (metaGetInfo(pMeta, suid ? suid : uid, &info) == 0) {
      sver = info.skmVer;
    } else {
      TBC *pSkmDbC = NULL;
      int  c;

      skmDbKey.uid = suid ? suid : uid;
      skmDbKey.sver = INT32_MAX;

      tdbTbcOpen(pMeta->pSkmDb, &pSkmDbC, NULL);
      metaRLock(pMeta);

      if (tdbTbcMoveTo(pSkmDbC, &skmDbKey, sizeof(skmDbKey), &c) < 0) {
        metaULock(pMeta);
        tdbTbcClose(pSkmDbC);
        code = TSDB_CODE_NOT_FOUND;
        goto _exit;
      }

      ASSERT(c);

      if (c < 0) {
        tdbTbcMoveToPrev(pSkmDbC);
      }

      const void *pKey = NULL;
      int32_t     nKey = 0;
      tdbTbcGet(pSkmDbC, &pKey, &nKey, NULL, NULL);

H
Hongze Cheng 已提交
551
      if (((SSkmDbKey *)pKey)->uid != skmDbKey.uid) {
H
Hongze Cheng 已提交
552 553 554 555 556 557 558
        metaULock(pMeta);
        tdbTbcClose(pSkmDbC);
        code = TSDB_CODE_NOT_FOUND;
        goto _exit;
      }

      sver = ((SSkmDbKey *)pKey)->sver;
H
Hongze Cheng 已提交
559

H
Hongze Cheng 已提交
560 561 562 563
      metaULock(pMeta);
      tdbTbcClose(pSkmDbC);
    }
  }
H
Hongze Cheng 已提交
564

H
Hongze Cheng 已提交
565 566 567 568
  ASSERT(sver > 0);

  skmDbKey.uid = suid ? suid : uid;
  skmDbKey.sver = sver;
H
Hongze Cheng 已提交
569
  metaRLock(pMeta);
H
Hongze Cheng 已提交
570
  if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(SSkmDbKey), &pData, &nData) < 0) {
H
Hongze Cheng 已提交
571
    metaULock(pMeta);
H
Hongze Cheng 已提交
572 573
    code = TSDB_CODE_NOT_FOUND;
    goto _exit;
H
Hongze Cheng 已提交
574 575 576 577 578 579 580 581 582 583 584
  }
  metaULock(pMeta);

  // decode
  SDecoder        dc = {0};
  SSchemaWrapper  schema;
  SSchemaWrapper *pSchemaWrapper = &schema;

  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
  tDecoderClear(&dc);
585
  tdbFree(pData);
H
Hongze Cheng 已提交
586 587 588 589 590 591 592 593 594

  // convert
  STSchemaBuilder sb = {0};

  tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
  for (int i = 0; i < pSchemaWrapper->nCols; i++) {
    SSchema *pSchema = pSchemaWrapper->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
H
Hongze Cheng 已提交
595
  STSchema *pTSchema = tdGetSchemaFromBuilder(&sb);
H
Hongze Cheng 已提交
596 597 598 599 600
  tdDestroyTSchemaBuilder(&sb);

  *ppTSchema = pTSchema;
  taosMemoryFree(pSchemaWrapper->pSchema);

H
Hongze Cheng 已提交
601
_exit:
H
Hongze Cheng 已提交
602 603 604
  return code;
}

605 606
// N.B. Called by statusReq per second
int64_t metaGetTbNum(SMeta *pMeta) {
607 608 609 610 611
  // num of child tables (excluding normal tables , stables and others)

  /* int64_t num = 0; */
  /* vnodeGetAllCtbNum(pMeta->pVnode, &num); */

612
  return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
613 614 615
}

// N.B. Called by statusReq per second
M
Minglei Jin 已提交
616
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
617 618 619 620 621 622
  // sum of (number of columns of stable -  1) * number of ctables (excluding timestamp column)
  int64_t num = 0;
  vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
  pMeta->pVnode->config.vndStats.numOfTimeSeries = num;

  return pMeta->pVnode->config.vndStats.numOfTimeSeries;
C
Cary Xu 已提交
623
}
H
Hongze Cheng 已提交
624

C
Cary Xu 已提交
625
typedef struct {
H
Hongze Cheng 已提交
626 627
  SMeta   *pMeta;
  TBC     *pCur;
C
Cary Xu 已提交
628
  tb_uid_t uid;
H
Hongze Cheng 已提交
629 630
  void    *pKey;
  void    *pVal;
C
Cary Xu 已提交
631 632 633 634 635 636 637 638 639 640 641 642 643
  int      kLen;
  int      vLen;
} SMSmaCursor;

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pSmaCur = NULL;
  SSmaIdxKey   smaIdxKey;
  int          ret;
  int          c;

  pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
  if (pSmaCur == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
644 645 646
    return NULL;
  }

C
Cary Xu 已提交
647 648 649 650
  pSmaCur->pMeta = pMeta;
  pSmaCur->uid = uid;
  metaRLock(pMeta);

H
Hongze Cheng 已提交
651
  ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
C
Cary Xu 已提交
652 653 654
  if (ret < 0) {
    metaULock(pMeta);
    taosMemoryFree(pSmaCur);
H
Hongze Cheng 已提交
655 656 657
    return NULL;
  }

C
Cary Xu 已提交
658 659 660
  // move to the suid
  smaIdxKey.uid = uid;
  smaIdxKey.smaUid = INT64_MIN;
H
Hongze Cheng 已提交
661
  tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
C
Cary Xu 已提交
662
  if (c > 0) {
H
Hongze Cheng 已提交
663
    tdbTbcMoveToNext(pSmaCur->pCur);
C
Cary Xu 已提交
664
  }
H
Hongze Cheng 已提交
665

C
Cary Xu 已提交
666 667
  return pSmaCur;
}
H
Hongze Cheng 已提交
668

C
Cary Xu 已提交
669 670 671 672
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
  if (pSmaCur) {
    if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
    if (pSmaCur->pCur) {
H
Hongze Cheng 已提交
673
      tdbTbcClose(pSmaCur->pCur);
H
Hongze Cheng 已提交
674

C
Cary Xu 已提交
675 676 677
      tdbFree(pSmaCur->pKey);
      tdbFree(pSmaCur->pVal);
    }
H
Hongze Cheng 已提交
678

C
Cary Xu 已提交
679 680 681 682 683 684 685 686
    taosMemoryFree(pSmaCur);
  }
}

tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
  int         ret;
  SSmaIdxKey *pSmaIdxKey;

H
Hongze Cheng 已提交
687
  ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
C
Cary Xu 已提交
688 689 690 691 692 693 694 695 696 697 698 699 700 701
  if (ret < 0) {
    return 0;
  }

  pSmaIdxKey = pSmaCur->pKey;
  if (pSmaIdxKey->uid > pSmaCur->uid) {
    return 0;
  }

  return pSmaIdxKey->uid;
}

STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
  STSmaWrapper *pSW = NULL;
H
Hongze Cheng 已提交
702
  SArray       *pSmaIds = NULL;
C
Cary Xu 已提交
703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725

  if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
    return NULL;
  }

  pSW = taosMemoryCalloc(1, sizeof(*pSW));
  if (!pSW) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pSW->number = taosArrayGetSize(pSmaIds);
  pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));

  if (!pSW->tSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  int64_t smaId;
  int     smaIdx = 0;
H
Hongze Cheng 已提交
726
  STSma  *pTSma = NULL;
C
Cary Xu 已提交
727 728 729
  for (int i = 0; i < pSW->number; ++i) {
    smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
    if (metaGetTableEntryByUid(&mr, smaId) < 0) {
730
      tDecoderClear(&mr.coder);
S
Shengliang Guan 已提交
731
      metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
C
Cary Xu 已提交
732 733
      continue;
    }
734
    tDecoderClear(&mr.coder);
C
Cary Xu 已提交
735 736 737 738 739 740 741 742
    pTSma = pSW->tSma + smaIdx;
    memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
    if (deepCopy) {
      if (pTSma->exprLen > 0) {
        if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
743
        memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
H
Hongze Cheng 已提交
744
      }
C
Cary Xu 已提交
745 746 747 748 749
      if (pTSma->tagsFilterLen > 0) {
        if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
750
      }
H
Hongze Cheng 已提交
751
      memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
C
Cary Xu 已提交
752 753 754 755 756
    } else {
      pTSma->exprLen = 0;
      pTSma->expr = NULL;
      pTSma->tagsFilterLen = 0;
      pTSma->tagsFilter = NULL;
H
Hongze Cheng 已提交
757
    }
H
Hongze Cheng 已提交
758

C
Cary Xu 已提交
759
    ++smaIdx;
H
Hongze Cheng 已提交
760 761
  }

C
Cary Xu 已提交
762 763
  if (smaIdx <= 0) goto _err;
  pSW->number = smaIdx;
H
Hongze Cheng 已提交
764

C
Cary Xu 已提交
765 766
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
H
Hongze Cheng 已提交
767
  return pSW;
C
Cary Xu 已提交
768 769 770
_err:
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
C
Cary Xu 已提交
771
  tFreeTSmaWrapper(pSW, deepCopy);
H
Hongze Cheng 已提交
772 773 774
  return NULL;
}

C
Cary Xu 已提交
775
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
H
Hongze Cheng 已提交
776
  STSma      *pTSma = NULL;
C
Cary Xu 已提交
777 778 779
  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
S
Shengliang Guan 已提交
780
    metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
C
Cary Xu 已提交
781 782 783 784 785 786 787 788 789 790 791 792 793 794
    metaReaderClear(&mr);
    return NULL;
  }
  pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
  if (!pTSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    metaReaderClear(&mr);
    return NULL;
  }

  memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));

  metaReaderClear(&mr);
  return pTSma;
H
Hongze Cheng 已提交
795 796
}

C
Cary Xu 已提交
797
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
H
Hongze Cheng 已提交
798
  SArray     *pUids = NULL;
C
Cary Xu 已提交
799
  SSmaIdxKey *pSmaIdxKey = NULL;
H
Hongze Cheng 已提交
800

C
Cary Xu 已提交
801 802
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (!pCur) {
H
Hongze Cheng 已提交
803 804 805
    return NULL;
  }

C
Cary Xu 已提交
806 807 808 809 810
  while (1) {
    tb_uid_t id = metaSmaCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Hongze Cheng 已提交
811

C
Cary Xu 已提交
812 813
    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
H
Hongze Cheng 已提交
814
      if (!pUids) {
C
Cary Xu 已提交
815 816 817
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
H
Hongze Cheng 已提交
818
      }
C
Cary Xu 已提交
819
    }
H
Hongze Cheng 已提交
820

C
Cary Xu 已提交
821
    pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
H
Hongze Cheng 已提交
822

C
Cary Xu 已提交
823 824 825 826 827
    if (taosArrayPush(pUids, &pSmaIdxKey->smaUid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
H
Hongze Cheng 已提交
828 829 830 831 832 833 834
    }
  }

  metaCloseSmaCursor(pCur);
  return pUids;
}

C
Cary Xu 已提交
835
SArray *metaGetSmaTbUids(SMeta *pMeta) {
H
Hongze Cheng 已提交
836
  SArray     *pUids = NULL;
C
Cary Xu 已提交
837 838 839 840 841
  SSmaIdxKey *pSmaIdxKey = NULL;
  tb_uid_t    lastUid = 0;

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
  if (!pCur) {
H
Hongze Cheng 已提交
842 843 844
    return NULL;
  }

C
Cary Xu 已提交
845 846 847 848 849
  while (1) {
    tb_uid_t uid = metaSmaCursorNext(pCur);
    if (uid == 0) {
      break;
    }
H
Hongze Cheng 已提交
850

C
Cary Xu 已提交
851 852 853
    if (lastUid == uid) {
      continue;
    }
H
Hongze Cheng 已提交
854

C
Cary Xu 已提交
855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
    lastUid = uid;

    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
      if (!pUids) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
      }
    }

    if (taosArrayPush(pUids, &uid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
    }
H
Hongze Cheng 已提交
872 873
  }

C
Cary Xu 已提交
874 875
  metaCloseSmaCursor(pCur);
  return pUids;
H
Hongze Cheng 已提交
876 877
}

L
Liu Jicong 已提交
878
#endif
H
Hongze Cheng 已提交
879

880
const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
dengyihao's avatar
dengyihao 已提交
881
  STag *tag = (STag *)pTag;
dengyihao's avatar
dengyihao 已提交
882
  if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
883 884
    return tag;
  }
wmmhello's avatar
wmmhello 已提交
885 886
  bool find = tTagGet(tag, val);

dengyihao's avatar
dengyihao 已提交
887
  if (!find) {
wmmhello's avatar
wmmhello 已提交
888 889
    return NULL;
  }
wmmhello's avatar
wmmhello 已提交
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918

  if (IS_VAR_DATA_TYPE(val->type)) {
    char* buf = taosMemoryCalloc(val->nData + 1, 1);
    memcpy(buf, val->pData, val->nData);
    metaDebug("metaTag table val varchar index:%d cid:%d type:%d value:%s", 1, val->cid, val->type, buf);
    taosMemoryFree(buf);
  } else {
    double dval = 0;
    GET_TYPED_DATA(dval, double, val->type, &val->i64);
    metaDebug("metaTag table val number index:%d cid:%d type:%d value:%f", 1, val->cid, val->type, dval);
  }

  SArray* pTagVals = NULL;
  tTagToValArray((STag*)pTag, &pTagVals);
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);

    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
      char* buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
      memcpy(buf, pTagVal->pData, pTagVal->nData);
      metaDebug("metaTag table varchar index:%d cid:%d type:%d value:%s", i, pTagVal->cid, pTagVal->type, buf);
      taosMemoryFree(buf);
    } else {
      double dval = 0;
      GET_TYPED_DATA(dval, double, pTagVal->type, &pTagVal->i64);
      metaDebug("metaTag table number index:%d cid:%d type:%d value:%f", i, pTagVal->cid, pTagVal->type, dval);
    }
  }

wmmhello's avatar
wmmhello 已提交
919
  return val;
dengyihao's avatar
dengyihao 已提交
920
}
wmmhello's avatar
wmmhello 已提交
921 922

typedef struct {
H
Hongze Cheng 已提交
923 924
  SMeta   *pMeta;
  TBC     *pCur;
wmmhello's avatar
wmmhello 已提交
925 926 927
  tb_uid_t suid;
  int16_t  cid;
  int16_t  type;
H
Hongze Cheng 已提交
928 929
  void    *pKey;
  void    *pVal;
wmmhello's avatar
wmmhello 已提交
930 931 932 933
  int32_t  kLen;
  int32_t  vLen;
} SIdxCursor;

dengyihao's avatar
dengyihao 已提交
934 935 936
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
  int32_t ret = 0;
  char   *buf = NULL;
wmmhello's avatar
wmmhello 已提交
937

dengyihao's avatar
dengyihao 已提交
938 939 940 941
  STagIdxKey *pKey = NULL;
  int32_t     nKey = 0;

  SIdxCursor *pCursor = NULL;
wmmhello's avatar
wmmhello 已提交
942 943 944 945 946 947 948 949 950 951 952 953
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
  pCursor->pMeta = pMeta;
  pCursor->suid = param->suid;
  pCursor->cid = param->cid;
  pCursor->type = param->type;

  metaRLock(pMeta);
  ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
  if (ret < 0) {
    goto END;
  }

dengyihao's avatar
dengyihao 已提交
954
  int32_t maxSize = 0;
wmmhello's avatar
wmmhello 已提交
955
  int32_t nTagData = 0;
H
Hongze Cheng 已提交
956
  void   *tagData = NULL;
dengyihao's avatar
dengyihao 已提交
957

dengyihao's avatar
dengyihao 已提交
958
  if (param->val == NULL) {
S
Shengliang Guan 已提交
959
    metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
dengyihao's avatar
dengyihao 已提交
960
    return -1;
dengyihao's avatar
dengyihao 已提交
961
  } else {
dengyihao's avatar
dengyihao 已提交
962 963 964 965 966 967 968 969 970 971
    if (IS_VAR_DATA_TYPE(param->type)) {
      tagData = varDataVal(param->val);
      nTagData = varDataLen(param->val);

      if (param->type == TSDB_DATA_TYPE_NCHAR) {
        maxSize = 4 * nTagData + 1;
        buf = taosMemoryCalloc(1, maxSize);
        if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
          goto END;
        }
dengyihao's avatar
dengyihao 已提交
972

dengyihao's avatar
dengyihao 已提交
973 974 975 976 977 978
        tagData = buf;
        nTagData = maxSize;
      }
    } else {
      tagData = param->val;
      nTagData = tDataTypes[param->type].bytes;
dengyihao's avatar
dengyihao 已提交
979
    }
wmmhello's avatar
wmmhello 已提交
980
  }
dengyihao's avatar
dengyihao 已提交
981
  ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
wmmhello's avatar
wmmhello 已提交
982
                            param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
dengyihao's avatar
dengyihao 已提交
983

wmmhello's avatar
wmmhello 已提交
984 985 986 987 988 989 990
  if (ret != 0) {
    goto END;
  }
  int cmp = 0;
  if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
991

dengyihao's avatar
dengyihao 已提交
992
  bool    first = true;
dengyihao's avatar
dengyihao 已提交
993
  int32_t valid = 0;
wmmhello's avatar
wmmhello 已提交
994
  while (1) {
dengyihao's avatar
dengyihao 已提交
995 996 997
    void   *entryKey = NULL, *entryVal = NULL;
    int32_t nEntryKey, nEntryVal;

wmmhello's avatar
wmmhello 已提交
998 999 1000 1001 1002
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
    if (valid < 0) {
      break;
    }
    STagIdxKey *p = entryKey;
dengyihao's avatar
dengyihao 已提交
1003 1004 1005 1006 1007 1008 1009 1010 1011
    if (p->type != pCursor->type) {
      if (first) {
        valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
        if (valid < 0) break;
        continue;
      } else {
        break;
      }
    }
dengyihao's avatar
dengyihao 已提交
1012 1013 1014
    if (p->suid != pKey->suid) {
      break;
    }
dengyihao's avatar
dengyihao 已提交
1015
    first = false;
wmmhello's avatar
wmmhello 已提交
1016 1017 1018 1019
    if (p != NULL) {
      int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
      if (cmp == 0) {
        // match
dengyihao's avatar
dengyihao 已提交
1020 1021 1022 1023 1024 1025
        tb_uid_t tuid = 0;
        if (IS_VAR_DATA_TYPE(pKey->type)) {
          tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
        } else {
          tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
        }
wmmhello's avatar
wmmhello 已提交
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
        taosArrayPush(pUids, &tuid);
      } else if (cmp == 1) {
        // not match but should continue to iter
      } else {
        // not match and no more result
        break;
      }
    }
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
    if (valid < 0) {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
1039

wmmhello's avatar
wmmhello 已提交
1040 1041 1042
END:
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
dengyihao's avatar
dengyihao 已提交
1043
  taosMemoryFree(buf);
dengyihao's avatar
dengyihao 已提交
1044
  taosMemoryFree(pKey);
wmmhello's avatar
wmmhello 已提交
1045 1046 1047 1048

  taosMemoryFree(pCursor);

  return ret;
dengyihao's avatar
dengyihao 已提交
1049
}
1050

1051
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
1052 1053
  SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid);

1054
  SHashObj *uHash = NULL;
dengyihao's avatar
dengyihao 已提交
1055 1056
  size_t    len = taosArrayGetSize(uidList);  // len > 0 means there already have uids
  if (len > 0) {
1057
    uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
dengyihao's avatar
dengyihao 已提交
1058
    for (int i = 0; i < len; i++) {
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
      int64_t *uid = taosArrayGet(uidList, i);
      taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
    }
  }
  while (1) {
    tb_uid_t id = metaCtbCursorNext(pCur);
    if (id == 0) {
      break;
    }

wmmhello's avatar
wmmhello 已提交
1069
    if (len > 0 && taosHashGet(uHash, &id, sizeof(int64_t)) == NULL) {
1070
      continue;
dengyihao's avatar
dengyihao 已提交
1071
    } else if (len == 0) {
1072 1073 1074
      taosArrayPush(uidList, &id);
    }

1075
    taosHashPut(tags, &id, sizeof(int64_t), pCur->pVal, pCur->vLen);
1076 1077
  }

1078 1079 1080 1081
  taosHashCleanup(uHash);
  metaCloseCtbCursor(pCur);
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1082

H
Hongze Cheng 已提交
1083 1084
int32_t metaCacheGet(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo);

H
Hongze Cheng 已提交
1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
  int32_t code = 0;
  void   *pData = NULL;
  int     nData = 0;

  metaRLock(pMeta);

  // search cache
  if (metaCacheGet(pMeta, uid, pInfo) == 0) {
    metaULock(pMeta);
    goto _exit;
  }

  // search TDB
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    // not found
    metaULock(pMeta);
    code = TSDB_CODE_NOT_FOUND;
    goto _exit;
  }

  metaULock(pMeta);

  pInfo->uid = uid;
  pInfo->suid = ((SUidIdxVal *)pData)->suid;
  pInfo->version = ((SUidIdxVal *)pData)->version;
  pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;

  // upsert the cache
  metaWLock(pMeta);
  metaCacheUpsert(pMeta, pInfo);
  metaULock(pMeta);

_exit:
  tdbFree(pData);
  return code;
}