metaQuery.c 19.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "meta.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
H
Hongze Cheng 已提交
19 20
  memset(pReader, 0, sizeof(*pReader));
  pReader->flags = flags;
H
Hongze Cheng 已提交
21
  pReader->pMeta = pMeta;
H
Hongze Cheng 已提交
22
  metaRLock(pMeta);
H
Hongze Cheng 已提交
23
}
H
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25
void metaReaderClear(SMetaReader *pReader) {
H
Hongze Cheng 已提交
26 27 28
  if (pReader->pMeta) {
    metaULock(pReader->pMeta);
  }
H
Hongze Cheng 已提交
29
  tDecoderClear(&pReader->coder);
H
Hongze Cheng 已提交
30
  tdbFree(pReader->pBuf);
H
Hongze Cheng 已提交
31 32
}

H
Hongze Cheng 已提交
33
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
H
Hongze Cheng 已提交
34
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
35 36
  STbDbKey tbDbKey = {.version = version, .uid = uid};

H
Hongze Cheng 已提交
37
  // query table.db
H
Hongze Cheng 已提交
38
  if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
39
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
40 41 42 43
    goto _err;
  }

  // decode the entry
H
Hongze Cheng 已提交
44
  tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
H
Hongze Cheng 已提交
45 46 47 48 49 50 51 52 53 54 55

  if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
    goto _err;
  }

  return 0;

_err:
  return -1;
}

H
Hongze Cheng 已提交
56
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
H
Hongze Cheng 已提交
57
  SMeta  *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
58 59 60
  int64_t version;

  // query uid.idx
H
Hongze Cheng 已提交
61
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
62
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
63 64 65 66
    return -1;
  }

  version = *(int64_t *)pReader->pBuf;
H
Hongze Cheng 已提交
67
  return metaGetTableEntryByVersion(pReader, version, uid);
H
Hongze Cheng 已提交
68 69
}

H
Hongze Cheng 已提交
70
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
H
Hongze Cheng 已提交
71
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
72 73 74
  tb_uid_t uid;

  // query name.idx
H
Hongze Cheng 已提交
75
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
76
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
77 78 79 80
    return -1;
  }

  uid = *(tb_uid_t *)pReader->pBuf;
H
Hongze Cheng 已提交
81
  return metaGetTableEntryByUid(pReader, uid);
H
Hongze Cheng 已提交
82 83
}

H
Hongze Cheng 已提交
84
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
H
Hongze Cheng 已提交
85
  void    *pData = NULL;
H
Hongze Cheng 已提交
86 87 88
  int      nData = 0;
  tb_uid_t uid = 0;

wmmhello's avatar
wmmhello 已提交
89 90
  metaRLock(pMeta);

H
Hongze Cheng 已提交
91 92 93 94 95
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
    uid = *(tb_uid_t *)pData;
    tdbFree(pData);
  }

wmmhello's avatar
wmmhello 已提交
96 97
  metaULock(pMeta);

98
  return uid;
H
Hongze Cheng 已提交
99 100
}

H
Hongze Cheng 已提交
101
int metaReadNext(SMetaReader *pReader) {
H
Hongze Cheng 已提交
102 103
  SMeta *pMeta = pReader->pMeta;

H
Hongze Cheng 已提交
104
  // TODO
H
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106 107 108 109
  return 0;
}

#if 1  // ===================================================
H
Hongze Cheng 已提交
110 111 112 113 114 115 116 117
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;

  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
  if (pTbCur == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
118
  metaReaderInit(&pTbCur->mr, pMeta, 0);
H
Hongze Cheng 已提交
119

H
Hongze Cheng 已提交
120
  tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
H
Hongze Cheng 已提交
121

H
Hongze Cheng 已提交
122
  tdbTbcMoveToFirst(pTbCur->pDbc);
H
Hongze Cheng 已提交
123

H
Hongze Cheng 已提交
124 125 126 127 128
  return pTbCur;
}

void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
H
Hongze Cheng 已提交
129 130
    tdbFree(pTbCur->pKey);
    tdbFree(pTbCur->pVal);
H
Hongze Cheng 已提交
131
    metaReaderClear(&pTbCur->mr);
H
Hongze Cheng 已提交
132
    if (pTbCur->pDbc) {
H
Hongze Cheng 已提交
133
      tdbTbcClose(pTbCur->pDbc);
H
Hongze Cheng 已提交
134 135 136 137 138
    }
    taosMemoryFree(pTbCur);
  }
}

H
Hongze Cheng 已提交
139
int metaTbCursorNext(SMTbCursor *pTbCur) {
H
Hongze Cheng 已提交
140
  int    ret;
H
Hongze Cheng 已提交
141
  void  *pBuf;
H
Hongze Cheng 已提交
142 143 144
  STbCfg tbCfg;

  for (;;) {
H
Hongze Cheng 已提交
145
    ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
H
Hongze Cheng 已提交
146 147
    if (ret < 0) {
      return -1;
H
Hongze Cheng 已提交
148 149
    }

H
Hongze Cheng 已提交
150
    metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
H
Hongze Cheng 已提交
151
    if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
152 153
      continue;
    }
H
Hongze Cheng 已提交
154 155

    break;
H
Hongze Cheng 已提交
156 157
  }

H
Hongze Cheng 已提交
158
  return 0;
H
Hongze Cheng 已提交
159 160
}

H
Hongze Cheng 已提交
161
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
H
Hongze Cheng 已提交
162
  void           *pData = NULL;
H
Hongze Cheng 已提交
163 164 165 166 167 168
  int             nData = 0;
  int64_t         version;
  SSchemaWrapper  schema = {0};
  SSchemaWrapper *pSchema = NULL;
  SDecoder        dc = {0};

H
fix  
Hongze Cheng 已提交
169
  metaRLock(pMeta);
H
Hongze Cheng 已提交
170 171 172 173
_query:
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    goto _err;
  }
H
Hongze Cheng 已提交
174

H
Hongze Cheng 已提交
175
  version = *(int64_t *)pData;
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177 178 179 180 181 182
  tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
  SMetaEntry me = {0};
  tDecoderInit(&dc, pData, nData);
  metaDecodeEntry(&dc, &me);
  if (me.type == TSDB_SUPER_TABLE) {
    if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
H
Hongze Cheng 已提交
183
      pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
H
Hongze Cheng 已提交
184 185
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
186
    }
H
Hongze Cheng 已提交
187 188
  } else if (me.type == TSDB_CHILD_TABLE) {
    uid = me.ctbEntry.suid;
H
Hongze Cheng 已提交
189
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
190
    goto _query;
H
Hongze Cheng 已提交
191
  } else {
H
Hongze Cheng 已提交
192 193 194 195
    if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
      pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
196
    }
H
Hongze Cheng 已提交
197 198
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200 201 202
  // query from skm db
  if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
    goto _err;
H
Hongze Cheng 已提交
203
  }
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205 206 207 208 209 210
  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, &schema);
  pSchema = tCloneSSchemaWrapper(&schema);
  tDecoderClear(&dc);

_exit:
H
Hongze Cheng 已提交
211 212 213
  metaULock(pMeta);
  tdbFree(pData);
  return pSchema;
H
Hongze Cheng 已提交
214

H
Hongze Cheng 已提交
215 216 217 218
_err:
  metaULock(pMeta);
  tdbFree(pData);
  return NULL;
H
Hongze Cheng 已提交
219 220
}

H
Hongze Cheng 已提交
221 222 223
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
  TBC *pCur;
  int  ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
224 225 226 227 228 229 230 231 232 233 234 235 236 237
  if (ret < 0) {
    return ret;
  }

  STtlIdxKey ttlKey = {0};
  ttlKey.dtime = ttl;
  ttlKey.uid = INT64_MAX;
  int c = 0;
  tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
  if (c < 0) {
    tdbTbcMoveToPrev(pCur);
  }

  void *pKey = NULL;
H
Hongze Cheng 已提交
238 239
  int   kLen = 0;
  while (1) {
240 241 242 243
    ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
    if (ret < 0) {
      break;
    }
H
Hongze Cheng 已提交
244
    ttlKey = *(STtlIdxKey *)pKey;
245 246 247 248 249
    taosArrayPush(uidList, &ttlKey.uid);
  }
  tdbTbcClose(pCur);

  tdbFree(pKey);
wmmhello's avatar
wmmhello 已提交
250

251 252 253
  return 0;
}

H
Hongze Cheng 已提交
254
struct SMCtbCursor {
H
Hongze Cheng 已提交
255 256
  SMeta   *pMeta;
  TBC     *pCur;
H
Hongze Cheng 已提交
257
  tb_uid_t suid;
H
Hongze Cheng 已提交
258 259
  void    *pKey;
  void    *pVal;
H
Hongze Cheng 已提交
260 261 262 263
  int      kLen;
  int      vLen;
};

H
Hongze Cheng 已提交
264 265
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
  SMCtbCursor *pCtbCur = NULL;
H
Hongze Cheng 已提交
266
  SCtbIdxKey   ctbIdxKey;
C
Cary Xu 已提交
267 268
  int          ret = 0;
  int          c = 0;
H
Hongze Cheng 已提交
269

H
Hongze Cheng 已提交
270 271 272 273
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
  if (pCtbCur == NULL) {
    return NULL;
  }
H
Hongze Cheng 已提交
274

H
fix  
Hongze Cheng 已提交
275
  pCtbCur->pMeta = pMeta;
H
Hongze Cheng 已提交
276
  pCtbCur->suid = uid;
H
fix  
Hongze Cheng 已提交
277 278
  metaRLock(pMeta);

H
Hongze Cheng 已提交
279
  ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
H
Hongze Cheng 已提交
280
  if (ret < 0) {
H
fix  
Hongze Cheng 已提交
281
    metaULock(pMeta);
H
Hongze Cheng 已提交
282 283 284
    taosMemoryFree(pCtbCur);
    return NULL;
  }
H
Hongze Cheng 已提交
285

H
Hongze Cheng 已提交
286 287 288
  // move to the suid
  ctbIdxKey.suid = uid;
  ctbIdxKey.uid = INT64_MIN;
H
Hongze Cheng 已提交
289
  tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
H
Hongze Cheng 已提交
290
  if (c > 0) {
H
Hongze Cheng 已提交
291
    tdbTbcMoveToNext(pCtbCur->pCur);
H
Hongze Cheng 已提交
292 293
  }

H
Hongze Cheng 已提交
294 295 296
  return pCtbCur;
}

C
Cary Xu 已提交
297
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
298
  if (pCtbCur) {
H
fix  
Hongze Cheng 已提交
299
    if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
H
Hongze Cheng 已提交
300
    if (pCtbCur->pCur) {
H
Hongze Cheng 已提交
301
      tdbTbcClose(pCtbCur->pCur);
H
Hongze Cheng 已提交
302

H
Hongze Cheng 已提交
303 304
      tdbFree(pCtbCur->pKey);
      tdbFree(pCtbCur->pVal);
H
Hongze Cheng 已提交
305
    }
H
Hongze Cheng 已提交
306

H
Hongze Cheng 已提交
307 308
    taosMemoryFree(pCtbCur);
  }
H
Hongze Cheng 已提交
309 310 311
}

tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
312 313
  int         ret;
  SCtbIdxKey *pCtbIdxKey;
H
Hongze Cheng 已提交
314

H
Hongze Cheng 已提交
315
  ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
H
Hongze Cheng 已提交
316 317 318
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320
  pCtbIdxKey = pCtbCur->pKey;
H
Hongze Cheng 已提交
321 322 323
  if (pCtbIdxKey->suid > pCtbCur->suid) {
    return 0;
  }
H
Hongze Cheng 已提交
324

H
Hongze Cheng 已提交
325
  return pCtbIdxKey->uid;
H
Hongze Cheng 已提交
326 327
}

C
Cary Xu 已提交
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
struct SMStbCursor {
  SMeta   *pMeta;
  TBC     *pCur;
  tb_uid_t suid;
  void    *pKey;
  void    *pVal;
  int      kLen;
  int      vLen;
};

SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
  SMStbCursor *pStbCur = NULL;
  int          ret = 0;
  int          c = 0;

  pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
  if (pStbCur == NULL) {
C
Cary Xu 已提交
345
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
346 347 348 349 350 351 352 353 354
    return NULL;
  }

  pStbCur->pMeta = pMeta;
  pStbCur->suid = suid;
  metaRLock(pMeta);

  ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
  if (ret < 0) {
C
Cary Xu 已提交
355
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
    metaULock(pMeta);
    taosMemoryFree(pStbCur);
    return NULL;
  }

  // move to the suid
  tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
  if (c > 0) {
    tdbTbcMoveToNext(pStbCur->pCur);
  }

  return pStbCur;
}

void metaCloseStbCursor(SMStbCursor *pStbCur) {
  if (pStbCur) {
    if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
    if (pStbCur->pCur) {
      tdbTbcClose(pStbCur->pCur);

      tdbFree(pStbCur->pKey);
      tdbFree(pStbCur->pVal);
    }

    taosMemoryFree(pStbCur);
  }
}

tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
  int ret;

  ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
391
  return *(tb_uid_t *)pStbCur->pKey;
C
Cary Xu 已提交
392 393
}

H
Hongze Cheng 已提交
394
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
H
Hongze Cheng 已提交
395
  // SMetaReader     mr = {0};
H
Hongze Cheng 已提交
396
  STSchema       *pTSchema = NULL;
H
Hongze Cheng 已提交
397 398
  SSchemaWrapper *pSW = NULL;
  STSchemaBuilder sb = {0};
H
Hongze Cheng 已提交
399
  SSchema        *pSchema;
H
Hongze Cheng 已提交
400

H
Hongze Cheng 已提交
401
  pSW = metaGetTableSchema(pMeta, uid, sver, 0);
C
Cary Xu 已提交
402
  if (!pSW) return NULL;
H
fix  
Hongze Cheng 已提交
403

404
  tdInitTSchemaBuilder(&sb, pSW->version);
H
Hongze Cheng 已提交
405 406 407 408 409
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
C
Cary Xu 已提交
410

H
Hongze Cheng 已提交
411 412
  tdDestroyTSchemaBuilder(&sb);

H
Hongze Cheng 已提交
413 414
  taosMemoryFree(pSW->pSchema);
  taosMemoryFree(pSW);
H
Hongze Cheng 已提交
415 416 417
  return pTSchema;
}

H
Hongze Cheng 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
  int32_t   code = 0;
  STSchema *pTSchema = NULL;
  SSkmDbKey skmDbKey = {.uid = suid ? suid : uid, .sver = sver};
  void     *pData = NULL;
  int       nData = 0;

  // query
  metaRLock(pMeta);
  if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), &pData, &nData) < 0) {
    code = TSDB_CODE_NOT_FOUND;
    metaULock(pMeta);
    goto _err;
  }
  metaULock(pMeta);

  // decode
  SDecoder        dc = {0};
  SSchemaWrapper  schema;
  SSchemaWrapper *pSchemaWrapper = &schema;

  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
  tDecoderClear(&dc);
442
  tdbFree(pData);
H
Hongze Cheng 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463

  // convert
  STSchemaBuilder sb = {0};

  tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
  for (int i = 0; i < pSchemaWrapper->nCols; i++) {
    SSchema *pSchema = pSchemaWrapper->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
  tdDestroyTSchemaBuilder(&sb);

  *ppTSchema = pTSchema;
  taosMemoryFree(pSchemaWrapper->pSchema);
  return code;

_err:
  *ppTSchema = NULL;
  return code;
}

C
Cary Xu 已提交
464 465 466 467 468
int metaGetTbNum(SMeta *pMeta) {
  // TODO
  // ASSERT(0);
  return 0;
}
H
Hongze Cheng 已提交
469

C
Cary Xu 已提交
470
typedef struct {
H
Hongze Cheng 已提交
471 472
  SMeta   *pMeta;
  TBC     *pCur;
C
Cary Xu 已提交
473
  tb_uid_t uid;
H
Hongze Cheng 已提交
474 475
  void    *pKey;
  void    *pVal;
C
Cary Xu 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488
  int      kLen;
  int      vLen;
} SMSmaCursor;

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pSmaCur = NULL;
  SSmaIdxKey   smaIdxKey;
  int          ret;
  int          c;

  pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
  if (pSmaCur == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
489 490 491
    return NULL;
  }

C
Cary Xu 已提交
492 493 494 495
  pSmaCur->pMeta = pMeta;
  pSmaCur->uid = uid;
  metaRLock(pMeta);

H
Hongze Cheng 已提交
496
  ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
C
Cary Xu 已提交
497 498 499
  if (ret < 0) {
    metaULock(pMeta);
    taosMemoryFree(pSmaCur);
H
Hongze Cheng 已提交
500 501 502
    return NULL;
  }

C
Cary Xu 已提交
503 504 505
  // move to the suid
  smaIdxKey.uid = uid;
  smaIdxKey.smaUid = INT64_MIN;
H
Hongze Cheng 已提交
506
  tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
C
Cary Xu 已提交
507
  if (c > 0) {
H
Hongze Cheng 已提交
508
    tdbTbcMoveToNext(pSmaCur->pCur);
C
Cary Xu 已提交
509
  }
H
Hongze Cheng 已提交
510

C
Cary Xu 已提交
511 512
  return pSmaCur;
}
H
Hongze Cheng 已提交
513

C
Cary Xu 已提交
514 515 516 517
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
  if (pSmaCur) {
    if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
    if (pSmaCur->pCur) {
H
Hongze Cheng 已提交
518
      tdbTbcClose(pSmaCur->pCur);
H
Hongze Cheng 已提交
519

C
Cary Xu 已提交
520 521 522
      tdbFree(pSmaCur->pKey);
      tdbFree(pSmaCur->pVal);
    }
H
Hongze Cheng 已提交
523

C
Cary Xu 已提交
524 525 526 527 528 529 530 531
    taosMemoryFree(pSmaCur);
  }
}

tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
  int         ret;
  SSmaIdxKey *pSmaIdxKey;

H
Hongze Cheng 已提交
532
  ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
C
Cary Xu 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546
  if (ret < 0) {
    return 0;
  }

  pSmaIdxKey = pSmaCur->pKey;
  if (pSmaIdxKey->uid > pSmaCur->uid) {
    return 0;
  }

  return pSmaIdxKey->uid;
}

STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
  STSmaWrapper *pSW = NULL;
H
Hongze Cheng 已提交
547
  SArray       *pSmaIds = NULL;
C
Cary Xu 已提交
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570

  if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
    return NULL;
  }

  pSW = taosMemoryCalloc(1, sizeof(*pSW));
  if (!pSW) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pSW->number = taosArrayGetSize(pSmaIds);
  pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));

  if (!pSW->tSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  int64_t smaId;
  int     smaIdx = 0;
H
Hongze Cheng 已提交
571
  STSma  *pTSma = NULL;
C
Cary Xu 已提交
572 573 574
  for (int i = 0; i < pSW->number; ++i) {
    smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
    if (metaGetTableEntryByUid(&mr, smaId) < 0) {
S
Shengliang Guan 已提交
575
      metaWarn("vgId:%d, no entry for tbId: %" PRIi64 ", smaId: %" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
C
Cary Xu 已提交
576 577 578 579 580 581 582 583 584 585
      continue;
    }
    pTSma = pSW->tSma + smaIdx;
    memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
    if (deepCopy) {
      if (pTSma->exprLen > 0) {
        if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
586
        memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
H
Hongze Cheng 已提交
587
      }
C
Cary Xu 已提交
588 589 590 591 592
      if (pTSma->tagsFilterLen > 0) {
        if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
593
      }
H
Hongze Cheng 已提交
594
      memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
C
Cary Xu 已提交
595 596 597 598 599
    } else {
      pTSma->exprLen = 0;
      pTSma->expr = NULL;
      pTSma->tagsFilterLen = 0;
      pTSma->tagsFilter = NULL;
H
Hongze Cheng 已提交
600
    }
H
Hongze Cheng 已提交
601

C
Cary Xu 已提交
602
    ++smaIdx;
H
Hongze Cheng 已提交
603 604
  }

C
Cary Xu 已提交
605 606
  if (smaIdx <= 0) goto _err;
  pSW->number = smaIdx;
H
Hongze Cheng 已提交
607

C
Cary Xu 已提交
608 609
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
H
Hongze Cheng 已提交
610
  return pSW;
C
Cary Xu 已提交
611 612 613
_err:
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
C
Cary Xu 已提交
614
  tFreeTSmaWrapper(pSW, deepCopy);
H
Hongze Cheng 已提交
615 616 617
  return NULL;
}

C
Cary Xu 已提交
618
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
H
Hongze Cheng 已提交
619
  STSma      *pTSma = NULL;
C
Cary Xu 已提交
620 621 622
  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
S
Shengliang Guan 已提交
623
    metaWarn("vgId:%d, failed to get table entry for smaId: %" PRIi64, TD_VID(pMeta->pVnode), indexUid);
C
Cary Xu 已提交
624 625 626 627 628 629 630 631 632 633 634 635 636 637
    metaReaderClear(&mr);
    return NULL;
  }
  pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
  if (!pTSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    metaReaderClear(&mr);
    return NULL;
  }

  memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));

  metaReaderClear(&mr);
  return pTSma;
H
Hongze Cheng 已提交
638 639
}

C
Cary Xu 已提交
640
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
H
Hongze Cheng 已提交
641
  SArray     *pUids = NULL;
C
Cary Xu 已提交
642
  SSmaIdxKey *pSmaIdxKey = NULL;
H
Hongze Cheng 已提交
643

C
Cary Xu 已提交
644 645
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (!pCur) {
H
Hongze Cheng 已提交
646 647 648
    return NULL;
  }

C
Cary Xu 已提交
649 650 651 652 653
  while (1) {
    tb_uid_t id = metaSmaCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Hongze Cheng 已提交
654

C
Cary Xu 已提交
655 656
    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
H
Hongze Cheng 已提交
657
      if (!pUids) {
C
Cary Xu 已提交
658 659 660
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
H
Hongze Cheng 已提交
661
      }
C
Cary Xu 已提交
662
    }
H
Hongze Cheng 已提交
663

C
Cary Xu 已提交
664
    pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
H
Hongze Cheng 已提交
665

C
Cary Xu 已提交
666 667 668 669 670
    if (taosArrayPush(pUids, &pSmaIdxKey->smaUid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
H
Hongze Cheng 已提交
671 672 673 674 675 676 677
    }
  }

  metaCloseSmaCursor(pCur);
  return pUids;
}

C
Cary Xu 已提交
678
SArray *metaGetSmaTbUids(SMeta *pMeta) {
H
Hongze Cheng 已提交
679
  SArray     *pUids = NULL;
C
Cary Xu 已提交
680 681 682 683 684
  SSmaIdxKey *pSmaIdxKey = NULL;
  tb_uid_t    lastUid = 0;

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
  if (!pCur) {
H
Hongze Cheng 已提交
685 686 687
    return NULL;
  }

C
Cary Xu 已提交
688 689 690 691 692
  while (1) {
    tb_uid_t uid = metaSmaCursorNext(pCur);
    if (uid == 0) {
      break;
    }
H
Hongze Cheng 已提交
693

C
Cary Xu 已提交
694 695 696
    if (lastUid == uid) {
      continue;
    }
H
Hongze Cheng 已提交
697

C
Cary Xu 已提交
698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
    lastUid = uid;

    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
      if (!pUids) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
      }
    }

    if (taosArrayPush(pUids, &uid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
    }
H
Hongze Cheng 已提交
715 716
  }

C
Cary Xu 已提交
717 718
  metaCloseSmaCursor(pCur);
  return pUids;
H
Hongze Cheng 已提交
719 720
}

L
Liu Jicong 已提交
721
#endif
H
Hongze Cheng 已提交
722

wmmhello's avatar
wmmhello 已提交
723
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
H
Hongze Cheng 已提交
724
  ASSERT(pEntry->type == TSDB_CHILD_TABLE);
wmmhello's avatar
wmmhello 已提交
725
  STag *tag = (STag *)pEntry->ctbEntry.pTags;
dengyihao's avatar
dengyihao 已提交
726
  if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
727 728
    return tag;
  }
wmmhello's avatar
wmmhello 已提交
729 730
  bool find = tTagGet(tag, val);

dengyihao's avatar
dengyihao 已提交
731
  if (!find) {
wmmhello's avatar
wmmhello 已提交
732 733 734
    return NULL;
  }
  return val;
dengyihao's avatar
dengyihao 已提交
735
}
wmmhello's avatar
wmmhello 已提交
736 737

typedef struct {
H
Hongze Cheng 已提交
738 739
  SMeta   *pMeta;
  TBC     *pCur;
wmmhello's avatar
wmmhello 已提交
740 741 742
  tb_uid_t suid;
  int16_t  cid;
  int16_t  type;
H
Hongze Cheng 已提交
743 744
  void    *pKey;
  void    *pVal;
wmmhello's avatar
wmmhello 已提交
745 746 747 748 749 750
  int32_t  kLen;
  int32_t  vLen;
} SIdxCursor;

int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
  SIdxCursor *pCursor = NULL;
H
Hongze Cheng 已提交
751
  char       *buf = NULL;
dengyihao's avatar
dengyihao 已提交
752
  int32_t     maxSize = 0;
wmmhello's avatar
wmmhello 已提交
753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769

  int32_t ret = 0, valid = 0;
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
  pCursor->pMeta = pMeta;
  pCursor->suid = param->suid;
  pCursor->cid = param->cid;
  pCursor->type = param->type;

  metaRLock(pMeta);
  ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
  if (ret < 0) {
    goto END;
  }
  STagIdxKey *pKey = NULL;
  int32_t     nKey = 0;

  int32_t nTagData = 0;
H
Hongze Cheng 已提交
770
  void   *tagData = NULL;
dengyihao's avatar
dengyihao 已提交
771

dengyihao's avatar
dengyihao 已提交
772
  if (param->val == NULL) {
S
Shengliang Guan 已提交
773
    metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
dengyihao's avatar
dengyihao 已提交
774
    return -1;
dengyihao's avatar
dengyihao 已提交
775
  } else {
dengyihao's avatar
dengyihao 已提交
776 777 778 779 780 781 782 783 784 785
    if (IS_VAR_DATA_TYPE(param->type)) {
      tagData = varDataVal(param->val);
      nTagData = varDataLen(param->val);

      if (param->type == TSDB_DATA_TYPE_NCHAR) {
        maxSize = 4 * nTagData + 1;
        buf = taosMemoryCalloc(1, maxSize);
        if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
          goto END;
        }
dengyihao's avatar
dengyihao 已提交
786

dengyihao's avatar
dengyihao 已提交
787 788 789 790 791 792
        tagData = buf;
        nTagData = maxSize;
      }
    } else {
      tagData = param->val;
      nTagData = tDataTypes[param->type].bytes;
dengyihao's avatar
dengyihao 已提交
793
    }
wmmhello's avatar
wmmhello 已提交
794
  }
dengyihao's avatar
dengyihao 已提交
795
  ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
wmmhello's avatar
wmmhello 已提交
796
                            param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
dengyihao's avatar
dengyihao 已提交
797

wmmhello's avatar
wmmhello 已提交
798 799 800 801 802 803 804
  if (ret != 0) {
    goto END;
  }
  int cmp = 0;
  if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
805

H
Hongze Cheng 已提交
806
  void   *entryKey = NULL, *entryVal = NULL;
wmmhello's avatar
wmmhello 已提交
807
  int32_t nEntryKey, nEntryVal;
dengyihao's avatar
dengyihao 已提交
808
  bool    first = true;
wmmhello's avatar
wmmhello 已提交
809 810 811 812 813 814
  while (1) {
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
    if (valid < 0) {
      break;
    }
    STagIdxKey *p = entryKey;
dengyihao's avatar
dengyihao 已提交
815 816 817 818 819 820 821 822 823 824
    if (p->type != pCursor->type) {
      if (first) {
        valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
        if (valid < 0) break;
        continue;
      } else {
        break;
      }
    }
    first = false;
wmmhello's avatar
wmmhello 已提交
825 826 827 828
    if (p != NULL) {
      int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
      if (cmp == 0) {
        // match
dengyihao's avatar
dengyihao 已提交
829 830 831 832 833 834
        tb_uid_t tuid = 0;
        if (IS_VAR_DATA_TYPE(pKey->type)) {
          tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
        } else {
          tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
        }
wmmhello's avatar
wmmhello 已提交
835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850
        taosArrayPush(pUids, &tuid);
      } else if (cmp == 1) {
        // not match but should continue to iter
      } else {
        // not match and no more result
        break;
      }
    }
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
    if (valid < 0) {
      break;
    }
  }
END:
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
dengyihao's avatar
dengyihao 已提交
851
  taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
852 853 854 855

  taosMemoryFree(pCursor);

  return ret;
dengyihao's avatar
dengyihao 已提交
856
}