metaQuery.c 19.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "meta.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
H
Hongze Cheng 已提交
19 20
  memset(pReader, 0, sizeof(*pReader));
  pReader->flags = flags;
H
Hongze Cheng 已提交
21
  pReader->pMeta = pMeta;
H
Hongze Cheng 已提交
22
  metaRLock(pMeta);
H
Hongze Cheng 已提交
23
}
H
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25
void metaReaderClear(SMetaReader *pReader) {
H
Hongze Cheng 已提交
26 27 28
  if (pReader->pMeta) {
    metaULock(pReader->pMeta);
  }
H
Hongze Cheng 已提交
29
  tDecoderClear(&pReader->coder);
H
Hongze Cheng 已提交
30
  tdbFree(pReader->pBuf);
H
Hongze Cheng 已提交
31 32
}

H
Hongze Cheng 已提交
33
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
H
Hongze Cheng 已提交
34
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
35 36
  STbDbKey tbDbKey = {.version = version, .uid = uid};

H
Hongze Cheng 已提交
37
  // query table.db
H
Hongze Cheng 已提交
38
  if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
39
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
40 41 42 43
    goto _err;
  }

  // decode the entry
H
Hongze Cheng 已提交
44
  tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
H
Hongze Cheng 已提交
45 46 47 48 49 50 51 52 53 54 55

  if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
    goto _err;
  }

  return 0;

_err:
  return -1;
}

H
Hongze Cheng 已提交
56
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
H
Hongze Cheng 已提交
57
  SMeta  *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
58 59 60
  int64_t version;

  // query uid.idx
H
Hongze Cheng 已提交
61
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
62
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
63 64 65 66
    return -1;
  }

  version = *(int64_t *)pReader->pBuf;
H
Hongze Cheng 已提交
67
  return metaGetTableEntryByVersion(pReader, version, uid);
H
Hongze Cheng 已提交
68 69
}

H
Hongze Cheng 已提交
70
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
H
Hongze Cheng 已提交
71
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
72 73 74
  tb_uid_t uid;

  // query name.idx
H
Hongze Cheng 已提交
75
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
76
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
77 78 79 80
    return -1;
  }

  uid = *(tb_uid_t *)pReader->pBuf;
H
Hongze Cheng 已提交
81
  return metaGetTableEntryByUid(pReader, uid);
H
Hongze Cheng 已提交
82 83
}

H
Hongze Cheng 已提交
84
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
H
Hongze Cheng 已提交
85
  void    *pData = NULL;
H
Hongze Cheng 已提交
86 87 88
  int      nData = 0;
  tb_uid_t uid = 0;

wmmhello's avatar
wmmhello 已提交
89 90
  metaRLock(pMeta);

H
Hongze Cheng 已提交
91 92 93 94 95
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
    uid = *(tb_uid_t *)pData;
    tdbFree(pData);
  }

wmmhello's avatar
wmmhello 已提交
96 97
  metaULock(pMeta);

98
  return uid;
H
Hongze Cheng 已提交
99 100
}

H
Hongze Cheng 已提交
101
int metaReadNext(SMetaReader *pReader) {
H
Hongze Cheng 已提交
102 103
  SMeta *pMeta = pReader->pMeta;

H
Hongze Cheng 已提交
104
  // TODO
H
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106 107 108 109
  return 0;
}

#if 1  // ===================================================
H
Hongze Cheng 已提交
110 111 112 113 114 115 116 117
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;

  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
  if (pTbCur == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
118
  metaReaderInit(&pTbCur->mr, pMeta, 0);
H
Hongze Cheng 已提交
119

H
Hongze Cheng 已提交
120
  tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
H
Hongze Cheng 已提交
121

H
Hongze Cheng 已提交
122
  tdbTbcMoveToFirst(pTbCur->pDbc);
H
Hongze Cheng 已提交
123

H
Hongze Cheng 已提交
124 125 126 127 128
  return pTbCur;
}

void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
H
Hongze Cheng 已提交
129 130
    tdbFree(pTbCur->pKey);
    tdbFree(pTbCur->pVal);
H
Hongze Cheng 已提交
131
    metaReaderClear(&pTbCur->mr);
H
Hongze Cheng 已提交
132
    if (pTbCur->pDbc) {
H
Hongze Cheng 已提交
133
      tdbTbcClose(pTbCur->pDbc);
H
Hongze Cheng 已提交
134 135 136 137 138
    }
    taosMemoryFree(pTbCur);
  }
}

H
Hongze Cheng 已提交
139
int metaTbCursorNext(SMTbCursor *pTbCur) {
H
Hongze Cheng 已提交
140
  int    ret;
H
Hongze Cheng 已提交
141
  void  *pBuf;
H
Hongze Cheng 已提交
142 143 144
  STbCfg tbCfg;

  for (;;) {
H
Hongze Cheng 已提交
145
    ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
H
Hongze Cheng 已提交
146 147
    if (ret < 0) {
      return -1;
H
Hongze Cheng 已提交
148 149
    }

H
Hongze Cheng 已提交
150
    metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
H
Hongze Cheng 已提交
151
    if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
152 153
      continue;
    }
H
Hongze Cheng 已提交
154 155

    break;
H
Hongze Cheng 已提交
156 157
  }

H
Hongze Cheng 已提交
158
  return 0;
H
Hongze Cheng 已提交
159 160
}

H
Hongze Cheng 已提交
161
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
H
Hongze Cheng 已提交
162
  void           *pData = NULL;
H
Hongze Cheng 已提交
163 164 165 166 167 168
  int             nData = 0;
  int64_t         version;
  SSchemaWrapper  schema = {0};
  SSchemaWrapper *pSchema = NULL;
  SDecoder        dc = {0};

H
fix  
Hongze Cheng 已提交
169
  metaRLock(pMeta);
H
Hongze Cheng 已提交
170 171 172 173
_query:
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    goto _err;
  }
H
Hongze Cheng 已提交
174

H
Hongze Cheng 已提交
175
  version = *(int64_t *)pData;
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177 178 179 180 181 182
  tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
  SMetaEntry me = {0};
  tDecoderInit(&dc, pData, nData);
  metaDecodeEntry(&dc, &me);
  if (me.type == TSDB_SUPER_TABLE) {
    if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
H
Hongze Cheng 已提交
183
      pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
H
Hongze Cheng 已提交
184 185
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
186
    }
H
Hongze Cheng 已提交
187 188
  } else if (me.type == TSDB_CHILD_TABLE) {
    uid = me.ctbEntry.suid;
H
Hongze Cheng 已提交
189
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
190
    goto _query;
H
Hongze Cheng 已提交
191
  } else {
H
Hongze Cheng 已提交
192 193 194 195
    if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
      pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
196
    }
H
Hongze Cheng 已提交
197 198
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200 201 202
  // query from skm db
  if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
    goto _err;
H
Hongze Cheng 已提交
203
  }
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205 206 207 208 209 210
  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, &schema);
  pSchema = tCloneSSchemaWrapper(&schema);
  tDecoderClear(&dc);

_exit:
H
Hongze Cheng 已提交
211 212 213
  metaULock(pMeta);
  tdbFree(pData);
  return pSchema;
H
Hongze Cheng 已提交
214

H
Hongze Cheng 已提交
215 216 217 218
_err:
  metaULock(pMeta);
  tdbFree(pData);
  return NULL;
H
Hongze Cheng 已提交
219 220
}

H
Hongze Cheng 已提交
221 222 223
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
  TBC *pCur;
  int  ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
224 225 226 227 228 229 230 231 232 233 234 235 236 237
  if (ret < 0) {
    return ret;
  }

  STtlIdxKey ttlKey = {0};
  ttlKey.dtime = ttl;
  ttlKey.uid = INT64_MAX;
  int c = 0;
  tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
  if (c < 0) {
    tdbTbcMoveToPrev(pCur);
  }

  void *pKey = NULL;
H
Hongze Cheng 已提交
238 239
  int   kLen = 0;
  while (1) {
240 241 242 243
    ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
    if (ret < 0) {
      break;
    }
H
Hongze Cheng 已提交
244
    ttlKey = *(STtlIdxKey *)pKey;
245 246 247 248 249
    taosArrayPush(uidList, &ttlKey.uid);
  }
  tdbTbcClose(pCur);

  tdbFree(pKey);
wmmhello's avatar
wmmhello 已提交
250

251 252 253
  return 0;
}

H
Hongze Cheng 已提交
254
struct SMCtbCursor {
H
Hongze Cheng 已提交
255 256
  SMeta   *pMeta;
  TBC     *pCur;
H
Hongze Cheng 已提交
257
  tb_uid_t suid;
H
Hongze Cheng 已提交
258 259
  void    *pKey;
  void    *pVal;
H
Hongze Cheng 已提交
260 261 262 263
  int      kLen;
  int      vLen;
};

H
Hongze Cheng 已提交
264 265
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
  SMCtbCursor *pCtbCur = NULL;
H
Hongze Cheng 已提交
266
  SCtbIdxKey   ctbIdxKey;
C
Cary Xu 已提交
267 268
  int          ret = 0;
  int          c = 0;
H
Hongze Cheng 已提交
269

H
Hongze Cheng 已提交
270 271 272 273
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
  if (pCtbCur == NULL) {
    return NULL;
  }
H
Hongze Cheng 已提交
274

H
fix  
Hongze Cheng 已提交
275
  pCtbCur->pMeta = pMeta;
H
Hongze Cheng 已提交
276
  pCtbCur->suid = uid;
H
fix  
Hongze Cheng 已提交
277 278
  metaRLock(pMeta);

H
Hongze Cheng 已提交
279
  ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
H
Hongze Cheng 已提交
280
  if (ret < 0) {
H
fix  
Hongze Cheng 已提交
281
    metaULock(pMeta);
H
Hongze Cheng 已提交
282 283 284
    taosMemoryFree(pCtbCur);
    return NULL;
  }
H
Hongze Cheng 已提交
285

H
Hongze Cheng 已提交
286 287 288
  // move to the suid
  ctbIdxKey.suid = uid;
  ctbIdxKey.uid = INT64_MIN;
H
Hongze Cheng 已提交
289
  tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
H
Hongze Cheng 已提交
290
  if (c > 0) {
H
Hongze Cheng 已提交
291
    tdbTbcMoveToNext(pCtbCur->pCur);
H
Hongze Cheng 已提交
292 293
  }

H
Hongze Cheng 已提交
294 295 296
  return pCtbCur;
}

C
Cary Xu 已提交
297
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
298
  if (pCtbCur) {
H
fix  
Hongze Cheng 已提交
299
    if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
H
Hongze Cheng 已提交
300
    if (pCtbCur->pCur) {
H
Hongze Cheng 已提交
301
      tdbTbcClose(pCtbCur->pCur);
H
Hongze Cheng 已提交
302

H
Hongze Cheng 已提交
303 304
      tdbFree(pCtbCur->pKey);
      tdbFree(pCtbCur->pVal);
H
Hongze Cheng 已提交
305
    }
H
Hongze Cheng 已提交
306

H
Hongze Cheng 已提交
307 308
    taosMemoryFree(pCtbCur);
  }
H
Hongze Cheng 已提交
309 310 311
}

tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
312 313
  int         ret;
  SCtbIdxKey *pCtbIdxKey;
H
Hongze Cheng 已提交
314

H
Hongze Cheng 已提交
315
  ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
H
Hongze Cheng 已提交
316 317 318
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320
  pCtbIdxKey = pCtbCur->pKey;
H
Hongze Cheng 已提交
321 322 323
  if (pCtbIdxKey->suid > pCtbCur->suid) {
    return 0;
  }
H
Hongze Cheng 已提交
324

H
Hongze Cheng 已提交
325
  return pCtbIdxKey->uid;
H
Hongze Cheng 已提交
326 327
}

C
Cary Xu 已提交
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
struct SMStbCursor {
  SMeta   *pMeta;
  TBC     *pCur;
  tb_uid_t suid;
  void    *pKey;
  void    *pVal;
  int      kLen;
  int      vLen;
};

SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
  SMStbCursor *pStbCur = NULL;
  int          ret = 0;
  int          c = 0;

  pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
  if (pStbCur == NULL) {
C
Cary Xu 已提交
345
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
346 347 348 349 350 351 352 353 354
    return NULL;
  }

  pStbCur->pMeta = pMeta;
  pStbCur->suid = suid;
  metaRLock(pMeta);

  ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
  if (ret < 0) {
C
Cary Xu 已提交
355
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
    metaULock(pMeta);
    taosMemoryFree(pStbCur);
    return NULL;
  }

  // move to the suid
  tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
  if (c > 0) {
    tdbTbcMoveToNext(pStbCur->pCur);
  }

  return pStbCur;
}

void metaCloseStbCursor(SMStbCursor *pStbCur) {
  if (pStbCur) {
    if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
    if (pStbCur->pCur) {
      tdbTbcClose(pStbCur->pCur);

      tdbFree(pStbCur->pKey);
      tdbFree(pStbCur->pVal);
    }

    taosMemoryFree(pStbCur);
  }
}

tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
  int ret;

  ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
391
  return *(tb_uid_t *)pStbCur->pKey;
C
Cary Xu 已提交
392 393
}

H
Hongze Cheng 已提交
394
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
H
Hongze Cheng 已提交
395
  // SMetaReader     mr = {0};
H
Hongze Cheng 已提交
396
  STSchema       *pTSchema = NULL;
H
Hongze Cheng 已提交
397 398
  SSchemaWrapper *pSW = NULL;
  STSchemaBuilder sb = {0};
H
Hongze Cheng 已提交
399
  SSchema        *pSchema;
H
Hongze Cheng 已提交
400

H
Hongze Cheng 已提交
401
  pSW = metaGetTableSchema(pMeta, uid, sver, 0);
C
Cary Xu 已提交
402
  if (!pSW) return NULL;
H
fix  
Hongze Cheng 已提交
403

404
  tdInitTSchemaBuilder(&sb, pSW->version);
H
Hongze Cheng 已提交
405 406 407 408 409
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
C
Cary Xu 已提交
410

H
Hongze Cheng 已提交
411 412
  tdDestroyTSchemaBuilder(&sb);

H
Hongze Cheng 已提交
413 414
  taosMemoryFree(pSW->pSchema);
  taosMemoryFree(pSW);
H
Hongze Cheng 已提交
415 416 417
  return pTSchema;
}

H
Hongze Cheng 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
  int32_t   code = 0;
  STSchema *pTSchema = NULL;
  SSkmDbKey skmDbKey = {.uid = suid ? suid : uid, .sver = sver};
  void     *pData = NULL;
  int       nData = 0;

  // query
  metaRLock(pMeta);
  if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), &pData, &nData) < 0) {
    code = TSDB_CODE_NOT_FOUND;
    metaULock(pMeta);
    goto _err;
  }
  metaULock(pMeta);

  // decode
  SDecoder        dc = {0};
  SSchemaWrapper  schema;
  SSchemaWrapper *pSchemaWrapper = &schema;

  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
  tDecoderClear(&dc);

  // convert
  STSchemaBuilder sb = {0};

  tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
  for (int i = 0; i < pSchemaWrapper->nCols; i++) {
    SSchema *pSchema = pSchemaWrapper->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
  tdDestroyTSchemaBuilder(&sb);

  *ppTSchema = pTSchema;
  taosMemoryFree(pSchemaWrapper->pSchema);
  return code;

_err:
  *ppTSchema = NULL;
  return code;
}

C
Cary Xu 已提交
463 464 465 466 467
int metaGetTbNum(SMeta *pMeta) {
  // TODO
  // ASSERT(0);
  return 0;
}
H
Hongze Cheng 已提交
468

C
Cary Xu 已提交
469
typedef struct {
H
Hongze Cheng 已提交
470 471
  SMeta   *pMeta;
  TBC     *pCur;
C
Cary Xu 已提交
472
  tb_uid_t uid;
H
Hongze Cheng 已提交
473 474
  void    *pKey;
  void    *pVal;
C
Cary Xu 已提交
475 476 477 478 479 480 481 482 483 484 485 486 487
  int      kLen;
  int      vLen;
} SMSmaCursor;

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pSmaCur = NULL;
  SSmaIdxKey   smaIdxKey;
  int          ret;
  int          c;

  pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
  if (pSmaCur == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
488 489 490
    return NULL;
  }

C
Cary Xu 已提交
491 492 493 494
  pSmaCur->pMeta = pMeta;
  pSmaCur->uid = uid;
  metaRLock(pMeta);

H
Hongze Cheng 已提交
495
  ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
C
Cary Xu 已提交
496 497 498
  if (ret < 0) {
    metaULock(pMeta);
    taosMemoryFree(pSmaCur);
H
Hongze Cheng 已提交
499 500 501
    return NULL;
  }

C
Cary Xu 已提交
502 503 504
  // move to the suid
  smaIdxKey.uid = uid;
  smaIdxKey.smaUid = INT64_MIN;
H
Hongze Cheng 已提交
505
  tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
C
Cary Xu 已提交
506
  if (c > 0) {
H
Hongze Cheng 已提交
507
    tdbTbcMoveToNext(pSmaCur->pCur);
C
Cary Xu 已提交
508
  }
H
Hongze Cheng 已提交
509

C
Cary Xu 已提交
510 511
  return pSmaCur;
}
H
Hongze Cheng 已提交
512

C
Cary Xu 已提交
513 514 515 516
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
  if (pSmaCur) {
    if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
    if (pSmaCur->pCur) {
H
Hongze Cheng 已提交
517
      tdbTbcClose(pSmaCur->pCur);
H
Hongze Cheng 已提交
518

C
Cary Xu 已提交
519 520 521
      tdbFree(pSmaCur->pKey);
      tdbFree(pSmaCur->pVal);
    }
H
Hongze Cheng 已提交
522

C
Cary Xu 已提交
523 524 525 526 527 528 529 530
    taosMemoryFree(pSmaCur);
  }
}

tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
  int         ret;
  SSmaIdxKey *pSmaIdxKey;

H
Hongze Cheng 已提交
531
  ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
C
Cary Xu 已提交
532 533 534 535 536 537 538 539 540 541 542 543 544 545
  if (ret < 0) {
    return 0;
  }

  pSmaIdxKey = pSmaCur->pKey;
  if (pSmaIdxKey->uid > pSmaCur->uid) {
    return 0;
  }

  return pSmaIdxKey->uid;
}

STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
  STSmaWrapper *pSW = NULL;
H
Hongze Cheng 已提交
546
  SArray       *pSmaIds = NULL;
C
Cary Xu 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569

  if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
    return NULL;
  }

  pSW = taosMemoryCalloc(1, sizeof(*pSW));
  if (!pSW) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pSW->number = taosArrayGetSize(pSmaIds);
  pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));

  if (!pSW->tSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  int64_t smaId;
  int     smaIdx = 0;
H
Hongze Cheng 已提交
570
  STSma  *pTSma = NULL;
C
Cary Xu 已提交
571 572 573
  for (int i = 0; i < pSW->number; ++i) {
    smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
    if (metaGetTableEntryByUid(&mr, smaId) < 0) {
S
Shengliang Guan 已提交
574
      metaWarn("vgId:%d, no entry for tbId: %" PRIi64 ", smaId: %" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
C
Cary Xu 已提交
575 576 577 578 579 580 581 582 583 584
      continue;
    }
    pTSma = pSW->tSma + smaIdx;
    memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
    if (deepCopy) {
      if (pTSma->exprLen > 0) {
        if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
585
        memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
H
Hongze Cheng 已提交
586
      }
C
Cary Xu 已提交
587 588 589 590 591
      if (pTSma->tagsFilterLen > 0) {
        if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
592
      }
H
Hongze Cheng 已提交
593
      memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
C
Cary Xu 已提交
594 595 596 597 598
    } else {
      pTSma->exprLen = 0;
      pTSma->expr = NULL;
      pTSma->tagsFilterLen = 0;
      pTSma->tagsFilter = NULL;
H
Hongze Cheng 已提交
599
    }
H
Hongze Cheng 已提交
600

C
Cary Xu 已提交
601
    ++smaIdx;
H
Hongze Cheng 已提交
602 603
  }

C
Cary Xu 已提交
604 605
  if (smaIdx <= 0) goto _err;
  pSW->number = smaIdx;
H
Hongze Cheng 已提交
606

C
Cary Xu 已提交
607 608
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
H
Hongze Cheng 已提交
609
  return pSW;
C
Cary Xu 已提交
610 611 612
_err:
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
C
Cary Xu 已提交
613
  tFreeTSmaWrapper(pSW, deepCopy);
H
Hongze Cheng 已提交
614 615 616
  return NULL;
}

C
Cary Xu 已提交
617
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
H
Hongze Cheng 已提交
618
  STSma      *pTSma = NULL;
C
Cary Xu 已提交
619 620 621
  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
S
Shengliang Guan 已提交
622
    metaWarn("vgId:%d, failed to get table entry for smaId: %" PRIi64, TD_VID(pMeta->pVnode), indexUid);
C
Cary Xu 已提交
623 624 625 626 627 628 629 630 631 632 633 634 635 636
    metaReaderClear(&mr);
    return NULL;
  }
  pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
  if (!pTSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    metaReaderClear(&mr);
    return NULL;
  }

  memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));

  metaReaderClear(&mr);
  return pTSma;
H
Hongze Cheng 已提交
637 638
}

C
Cary Xu 已提交
639
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
H
Hongze Cheng 已提交
640
  SArray     *pUids = NULL;
C
Cary Xu 已提交
641
  SSmaIdxKey *pSmaIdxKey = NULL;
H
Hongze Cheng 已提交
642

C
Cary Xu 已提交
643 644
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (!pCur) {
H
Hongze Cheng 已提交
645 646 647
    return NULL;
  }

C
Cary Xu 已提交
648 649 650 651 652
  while (1) {
    tb_uid_t id = metaSmaCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Hongze Cheng 已提交
653

C
Cary Xu 已提交
654 655
    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
H
Hongze Cheng 已提交
656
      if (!pUids) {
C
Cary Xu 已提交
657 658 659
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
H
Hongze Cheng 已提交
660
      }
C
Cary Xu 已提交
661
    }
H
Hongze Cheng 已提交
662

C
Cary Xu 已提交
663
    pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
H
Hongze Cheng 已提交
664

C
Cary Xu 已提交
665 666 667 668 669
    if (taosArrayPush(pUids, &pSmaIdxKey->smaUid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
H
Hongze Cheng 已提交
670 671 672 673 674 675 676
    }
  }

  metaCloseSmaCursor(pCur);
  return pUids;
}

C
Cary Xu 已提交
677
SArray *metaGetSmaTbUids(SMeta *pMeta) {
H
Hongze Cheng 已提交
678
  SArray     *pUids = NULL;
C
Cary Xu 已提交
679 680 681 682 683
  SSmaIdxKey *pSmaIdxKey = NULL;
  tb_uid_t    lastUid = 0;

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
  if (!pCur) {
H
Hongze Cheng 已提交
684 685 686
    return NULL;
  }

C
Cary Xu 已提交
687 688 689 690 691
  while (1) {
    tb_uid_t uid = metaSmaCursorNext(pCur);
    if (uid == 0) {
      break;
    }
H
Hongze Cheng 已提交
692

C
Cary Xu 已提交
693 694 695
    if (lastUid == uid) {
      continue;
    }
H
Hongze Cheng 已提交
696

C
Cary Xu 已提交
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
    lastUid = uid;

    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
      if (!pUids) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
      }
    }

    if (taosArrayPush(pUids, &uid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
    }
H
Hongze Cheng 已提交
714 715
  }

C
Cary Xu 已提交
716 717
  metaCloseSmaCursor(pCur);
  return pUids;
H
Hongze Cheng 已提交
718 719
}

L
Liu Jicong 已提交
720
#endif
H
Hongze Cheng 已提交
721

wmmhello's avatar
wmmhello 已提交
722
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
H
Hongze Cheng 已提交
723
  ASSERT(pEntry->type == TSDB_CHILD_TABLE);
wmmhello's avatar
wmmhello 已提交
724
  STag *tag = (STag *)pEntry->ctbEntry.pTags;
dengyihao's avatar
dengyihao 已提交
725
  if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
726 727
    return tag;
  }
wmmhello's avatar
wmmhello 已提交
728 729
  bool find = tTagGet(tag, val);

dengyihao's avatar
dengyihao 已提交
730
  if (!find) {
wmmhello's avatar
wmmhello 已提交
731 732 733
    return NULL;
  }
  return val;
dengyihao's avatar
dengyihao 已提交
734
}
wmmhello's avatar
wmmhello 已提交
735 736

typedef struct {
H
Hongze Cheng 已提交
737 738
  SMeta   *pMeta;
  TBC     *pCur;
wmmhello's avatar
wmmhello 已提交
739 740 741
  tb_uid_t suid;
  int16_t  cid;
  int16_t  type;
H
Hongze Cheng 已提交
742 743
  void    *pKey;
  void    *pVal;
wmmhello's avatar
wmmhello 已提交
744 745 746 747 748 749
  int32_t  kLen;
  int32_t  vLen;
} SIdxCursor;

int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
  SIdxCursor *pCursor = NULL;
H
Hongze Cheng 已提交
750
  char       *buf = NULL;
dengyihao's avatar
dengyihao 已提交
751
  int32_t     maxSize = 0;
wmmhello's avatar
wmmhello 已提交
752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768

  int32_t ret = 0, valid = 0;
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
  pCursor->pMeta = pMeta;
  pCursor->suid = param->suid;
  pCursor->cid = param->cid;
  pCursor->type = param->type;

  metaRLock(pMeta);
  ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
  if (ret < 0) {
    goto END;
  }
  STagIdxKey *pKey = NULL;
  int32_t     nKey = 0;

  int32_t nTagData = 0;
H
Hongze Cheng 已提交
769
  void   *tagData = NULL;
dengyihao's avatar
dengyihao 已提交
770

dengyihao's avatar
dengyihao 已提交
771
  if (param->val == NULL) {
S
Shengliang Guan 已提交
772
    metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
dengyihao's avatar
dengyihao 已提交
773
    return -1;
dengyihao's avatar
dengyihao 已提交
774
  } else {
dengyihao's avatar
dengyihao 已提交
775 776 777 778 779 780 781 782 783 784
    if (IS_VAR_DATA_TYPE(param->type)) {
      tagData = varDataVal(param->val);
      nTagData = varDataLen(param->val);

      if (param->type == TSDB_DATA_TYPE_NCHAR) {
        maxSize = 4 * nTagData + 1;
        buf = taosMemoryCalloc(1, maxSize);
        if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
          goto END;
        }
dengyihao's avatar
dengyihao 已提交
785

dengyihao's avatar
dengyihao 已提交
786 787 788 789 790 791
        tagData = buf;
        nTagData = maxSize;
      }
    } else {
      tagData = param->val;
      nTagData = tDataTypes[param->type].bytes;
dengyihao's avatar
dengyihao 已提交
792
    }
wmmhello's avatar
wmmhello 已提交
793
  }
dengyihao's avatar
dengyihao 已提交
794
  ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
wmmhello's avatar
wmmhello 已提交
795
                            param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
dengyihao's avatar
dengyihao 已提交
796

wmmhello's avatar
wmmhello 已提交
797 798 799 800 801 802 803
  if (ret != 0) {
    goto END;
  }
  int cmp = 0;
  if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
804

H
Hongze Cheng 已提交
805
  void   *entryKey = NULL, *entryVal = NULL;
wmmhello's avatar
wmmhello 已提交
806
  int32_t nEntryKey, nEntryVal;
dengyihao's avatar
dengyihao 已提交
807
  bool    first = true;
wmmhello's avatar
wmmhello 已提交
808 809 810 811 812 813
  while (1) {
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
    if (valid < 0) {
      break;
    }
    STagIdxKey *p = entryKey;
dengyihao's avatar
dengyihao 已提交
814 815 816 817 818 819 820 821 822 823
    if (p->type != pCursor->type) {
      if (first) {
        valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
        if (valid < 0) break;
        continue;
      } else {
        break;
      }
    }
    first = false;
wmmhello's avatar
wmmhello 已提交
824 825 826 827
    if (p != NULL) {
      int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
      if (cmp == 0) {
        // match
dengyihao's avatar
dengyihao 已提交
828 829 830 831 832 833
        tb_uid_t tuid = 0;
        if (IS_VAR_DATA_TYPE(pKey->type)) {
          tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
        } else {
          tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
        }
wmmhello's avatar
wmmhello 已提交
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849
        taosArrayPush(pUids, &tuid);
      } else if (cmp == 1) {
        // not match but should continue to iter
      } else {
        // not match and no more result
        break;
      }
    }
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
    if (valid < 0) {
      break;
    }
  }
END:
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
dengyihao's avatar
dengyihao 已提交
850
  taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
851 852 853 854

  taosMemoryFree(pCursor);

  return ret;
dengyihao's avatar
dengyihao 已提交
855
}