metaQuery.c 19.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "meta.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
H
Hongze Cheng 已提交
19 20
  memset(pReader, 0, sizeof(*pReader));
  pReader->flags = flags;
H
Hongze Cheng 已提交
21
  pReader->pMeta = pMeta;
H
Hongze Cheng 已提交
22
  metaRLock(pMeta);
H
Hongze Cheng 已提交
23
}
H
Hongze Cheng 已提交
24

H
Hongze Cheng 已提交
25
void metaReaderClear(SMetaReader *pReader) {
H
Hongze Cheng 已提交
26 27 28
  if (pReader->pMeta) {
    metaULock(pReader->pMeta);
  }
H
Hongze Cheng 已提交
29
  tDecoderClear(&pReader->coder);
H
Hongze Cheng 已提交
30
  tdbFree(pReader->pBuf);
H
Hongze Cheng 已提交
31 32
}

H
Hongze Cheng 已提交
33
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
H
Hongze Cheng 已提交
34
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
35 36
  STbDbKey tbDbKey = {.version = version, .uid = uid};

H
Hongze Cheng 已提交
37
  // query table.db
H
Hongze Cheng 已提交
38
  if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
39
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
40 41 42 43
    goto _err;
  }

  // decode the entry
H
Hongze Cheng 已提交
44
  tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
H
Hongze Cheng 已提交
45 46 47 48 49 50 51 52 53 54 55

  if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
    goto _err;
  }

  return 0;

_err:
  return -1;
}

H
Hongze Cheng 已提交
56
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
H
Hongze Cheng 已提交
57
  SMeta  *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
58 59 60
  int64_t version;

  // query uid.idx
H
Hongze Cheng 已提交
61
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
62
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
63 64 65 66
    return -1;
  }

  version = *(int64_t *)pReader->pBuf;
H
Hongze Cheng 已提交
67
  return metaGetTableEntryByVersion(pReader, version, uid);
H
Hongze Cheng 已提交
68 69
}

H
Hongze Cheng 已提交
70
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
H
Hongze Cheng 已提交
71
  SMeta   *pMeta = pReader->pMeta;
H
Hongze Cheng 已提交
72 73 74
  tb_uid_t uid;

  // query name.idx
H
Hongze Cheng 已提交
75
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
H
Hongze Cheng 已提交
76
    terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
H
Hongze Cheng 已提交
77 78 79 80
    return -1;
  }

  uid = *(tb_uid_t *)pReader->pBuf;
H
Hongze Cheng 已提交
81
  return metaGetTableEntryByUid(pReader, uid);
H
Hongze Cheng 已提交
82 83
}

H
Hongze Cheng 已提交
84
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
H
Hongze Cheng 已提交
85
  void    *pData = NULL;
H
Hongze Cheng 已提交
86 87 88
  int      nData = 0;
  tb_uid_t uid = 0;

wmmhello's avatar
wmmhello 已提交
89 90
  metaRLock(pMeta);

H
Hongze Cheng 已提交
91 92 93 94 95
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
    uid = *(tb_uid_t *)pData;
    tdbFree(pData);
  }

wmmhello's avatar
wmmhello 已提交
96 97
  metaULock(pMeta);

98
  return uid;
H
Hongze Cheng 已提交
99 100
}

H
Hongze Cheng 已提交
101
int metaReadNext(SMetaReader *pReader) {
H
Hongze Cheng 已提交
102 103
  SMeta *pMeta = pReader->pMeta;

H
Hongze Cheng 已提交
104
  // TODO
H
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106 107 108 109
  return 0;
}

#if 1  // ===================================================
H
Hongze Cheng 已提交
110 111 112 113 114 115 116 117
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
  SMTbCursor *pTbCur = NULL;

  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
  if (pTbCur == NULL) {
    return NULL;
  }

H
Hongze Cheng 已提交
118
  metaReaderInit(&pTbCur->mr, pMeta, 0);
H
Hongze Cheng 已提交
119

H
Hongze Cheng 已提交
120
  tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
H
Hongze Cheng 已提交
121

H
Hongze Cheng 已提交
122
  tdbTbcMoveToFirst(pTbCur->pDbc);
H
Hongze Cheng 已提交
123

H
Hongze Cheng 已提交
124 125 126 127 128
  return pTbCur;
}

void metaCloseTbCursor(SMTbCursor *pTbCur) {
  if (pTbCur) {
H
Hongze Cheng 已提交
129 130
    tdbFree(pTbCur->pKey);
    tdbFree(pTbCur->pVal);
H
Hongze Cheng 已提交
131
    metaReaderClear(&pTbCur->mr);
H
Hongze Cheng 已提交
132
    if (pTbCur->pDbc) {
H
Hongze Cheng 已提交
133
      tdbTbcClose(pTbCur->pDbc);
H
Hongze Cheng 已提交
134 135 136 137 138
    }
    taosMemoryFree(pTbCur);
  }
}

H
Hongze Cheng 已提交
139
int metaTbCursorNext(SMTbCursor *pTbCur) {
H
Hongze Cheng 已提交
140
  int    ret;
H
Hongze Cheng 已提交
141
  void  *pBuf;
H
Hongze Cheng 已提交
142 143 144
  STbCfg tbCfg;

  for (;;) {
H
Hongze Cheng 已提交
145
    ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
H
Hongze Cheng 已提交
146 147
    if (ret < 0) {
      return -1;
H
Hongze Cheng 已提交
148 149
    }

M
Minglei Jin 已提交
150 151
    tDecoderClear(&pTbCur->mr.coder);

H
Hongze Cheng 已提交
152
    metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
H
Hongze Cheng 已提交
153
    if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
H
Hongze Cheng 已提交
154 155
      continue;
    }
H
Hongze Cheng 已提交
156 157

    break;
H
Hongze Cheng 已提交
158 159
  }

H
Hongze Cheng 已提交
160
  return 0;
H
Hongze Cheng 已提交
161 162
}

H
Hongze Cheng 已提交
163
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
H
Hongze Cheng 已提交
164
  void           *pData = NULL;
H
Hongze Cheng 已提交
165 166 167 168 169 170
  int             nData = 0;
  int64_t         version;
  SSchemaWrapper  schema = {0};
  SSchemaWrapper *pSchema = NULL;
  SDecoder        dc = {0};

H
fix  
Hongze Cheng 已提交
171
  metaRLock(pMeta);
H
Hongze Cheng 已提交
172 173 174 175
_query:
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
    goto _err;
  }
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177
  version = *(int64_t *)pData;
H
Hongze Cheng 已提交
178

H
Hongze Cheng 已提交
179 180 181 182 183 184
  tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
  SMetaEntry me = {0};
  tDecoderInit(&dc, pData, nData);
  metaDecodeEntry(&dc, &me);
  if (me.type == TSDB_SUPER_TABLE) {
    if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
H
Hongze Cheng 已提交
185
      pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
H
Hongze Cheng 已提交
186 187
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
188
    }
H
Hongze Cheng 已提交
189 190
  } else if (me.type == TSDB_CHILD_TABLE) {
    uid = me.ctbEntry.suid;
H
Hongze Cheng 已提交
191
    tDecoderClear(&dc);
H
Hongze Cheng 已提交
192
    goto _query;
H
Hongze Cheng 已提交
193
  } else {
H
Hongze Cheng 已提交
194 195 196 197
    if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
      pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
      tDecoderClear(&dc);
      goto _exit;
H
Hongze Cheng 已提交
198
    }
H
Hongze Cheng 已提交
199 200
  }
  tDecoderClear(&dc);
H
Hongze Cheng 已提交
201

H
Hongze Cheng 已提交
202 203 204
  // query from skm db
  if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
    goto _err;
H
Hongze Cheng 已提交
205
  }
H
Hongze Cheng 已提交
206

H
Hongze Cheng 已提交
207 208 209 210 211 212
  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, &schema);
  pSchema = tCloneSSchemaWrapper(&schema);
  tDecoderClear(&dc);

_exit:
H
Hongze Cheng 已提交
213 214 215
  metaULock(pMeta);
  tdbFree(pData);
  return pSchema;
H
Hongze Cheng 已提交
216

H
Hongze Cheng 已提交
217 218 219 220
_err:
  metaULock(pMeta);
  tdbFree(pData);
  return NULL;
H
Hongze Cheng 已提交
221 222
}

H
Hongze Cheng 已提交
223 224 225
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
  TBC *pCur;
  int  ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
226 227 228 229 230 231 232 233 234 235 236 237 238 239
  if (ret < 0) {
    return ret;
  }

  STtlIdxKey ttlKey = {0};
  ttlKey.dtime = ttl;
  ttlKey.uid = INT64_MAX;
  int c = 0;
  tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
  if (c < 0) {
    tdbTbcMoveToPrev(pCur);
  }

  void *pKey = NULL;
H
Hongze Cheng 已提交
240 241
  int   kLen = 0;
  while (1) {
242 243 244 245
    ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
    if (ret < 0) {
      break;
    }
H
Hongze Cheng 已提交
246
    ttlKey = *(STtlIdxKey *)pKey;
247 248 249 250 251
    taosArrayPush(uidList, &ttlKey.uid);
  }
  tdbTbcClose(pCur);

  tdbFree(pKey);
wmmhello's avatar
wmmhello 已提交
252

253 254 255
  return 0;
}

H
Hongze Cheng 已提交
256
struct SMCtbCursor {
H
Hongze Cheng 已提交
257 258
  SMeta   *pMeta;
  TBC     *pCur;
H
Hongze Cheng 已提交
259
  tb_uid_t suid;
H
Hongze Cheng 已提交
260 261
  void    *pKey;
  void    *pVal;
H
Hongze Cheng 已提交
262 263 264 265
  int      kLen;
  int      vLen;
};

H
Hongze Cheng 已提交
266 267
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
  SMCtbCursor *pCtbCur = NULL;
H
Hongze Cheng 已提交
268
  SCtbIdxKey   ctbIdxKey;
C
Cary Xu 已提交
269 270
  int          ret = 0;
  int          c = 0;
H
Hongze Cheng 已提交
271

H
Hongze Cheng 已提交
272 273 274 275
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
  if (pCtbCur == NULL) {
    return NULL;
  }
H
Hongze Cheng 已提交
276

H
fix  
Hongze Cheng 已提交
277
  pCtbCur->pMeta = pMeta;
H
Hongze Cheng 已提交
278
  pCtbCur->suid = uid;
H
fix  
Hongze Cheng 已提交
279 280
  metaRLock(pMeta);

H
Hongze Cheng 已提交
281
  ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
H
Hongze Cheng 已提交
282
  if (ret < 0) {
H
fix  
Hongze Cheng 已提交
283
    metaULock(pMeta);
H
Hongze Cheng 已提交
284 285 286
    taosMemoryFree(pCtbCur);
    return NULL;
  }
H
Hongze Cheng 已提交
287

H
Hongze Cheng 已提交
288 289 290
  // move to the suid
  ctbIdxKey.suid = uid;
  ctbIdxKey.uid = INT64_MIN;
H
Hongze Cheng 已提交
291
  tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
H
Hongze Cheng 已提交
292
  if (c > 0) {
H
Hongze Cheng 已提交
293
    tdbTbcMoveToNext(pCtbCur->pCur);
H
Hongze Cheng 已提交
294 295
  }

H
Hongze Cheng 已提交
296 297 298
  return pCtbCur;
}

C
Cary Xu 已提交
299
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
300
  if (pCtbCur) {
H
fix  
Hongze Cheng 已提交
301
    if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
H
Hongze Cheng 已提交
302
    if (pCtbCur->pCur) {
H
Hongze Cheng 已提交
303
      tdbTbcClose(pCtbCur->pCur);
H
Hongze Cheng 已提交
304

H
Hongze Cheng 已提交
305 306
      tdbFree(pCtbCur->pKey);
      tdbFree(pCtbCur->pVal);
H
Hongze Cheng 已提交
307
    }
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309 310
    taosMemoryFree(pCtbCur);
  }
H
Hongze Cheng 已提交
311 312 313
}

tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
H
Hongze Cheng 已提交
314 315
  int         ret;
  SCtbIdxKey *pCtbIdxKey;
H
Hongze Cheng 已提交
316

H
Hongze Cheng 已提交
317
  ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
H
Hongze Cheng 已提交
318 319 320
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
321

H
Hongze Cheng 已提交
322
  pCtbIdxKey = pCtbCur->pKey;
H
Hongze Cheng 已提交
323 324 325
  if (pCtbIdxKey->suid > pCtbCur->suid) {
    return 0;
  }
H
Hongze Cheng 已提交
326

H
Hongze Cheng 已提交
327
  return pCtbIdxKey->uid;
H
Hongze Cheng 已提交
328 329
}

C
Cary Xu 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
struct SMStbCursor {
  SMeta   *pMeta;
  TBC     *pCur;
  tb_uid_t suid;
  void    *pKey;
  void    *pVal;
  int      kLen;
  int      vLen;
};

SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
  SMStbCursor *pStbCur = NULL;
  int          ret = 0;
  int          c = 0;

  pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
  if (pStbCur == NULL) {
C
Cary Xu 已提交
347
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
348 349 350 351 352 353 354 355 356
    return NULL;
  }

  pStbCur->pMeta = pMeta;
  pStbCur->suid = suid;
  metaRLock(pMeta);

  ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
  if (ret < 0) {
C
Cary Xu 已提交
357
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
    metaULock(pMeta);
    taosMemoryFree(pStbCur);
    return NULL;
  }

  // move to the suid
  tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
  if (c > 0) {
    tdbTbcMoveToNext(pStbCur->pCur);
  }

  return pStbCur;
}

void metaCloseStbCursor(SMStbCursor *pStbCur) {
  if (pStbCur) {
    if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
    if (pStbCur->pCur) {
      tdbTbcClose(pStbCur->pCur);

      tdbFree(pStbCur->pKey);
      tdbFree(pStbCur->pVal);
    }

    taosMemoryFree(pStbCur);
  }
}

tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
  int ret;

  ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
  if (ret < 0) {
    return 0;
  }
H
Hongze Cheng 已提交
393
  return *(tb_uid_t *)pStbCur->pKey;
C
Cary Xu 已提交
394 395
}

H
Hongze Cheng 已提交
396
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
H
Hongze Cheng 已提交
397
  // SMetaReader     mr = {0};
H
Hongze Cheng 已提交
398
  STSchema       *pTSchema = NULL;
H
Hongze Cheng 已提交
399 400
  SSchemaWrapper *pSW = NULL;
  STSchemaBuilder sb = {0};
H
Hongze Cheng 已提交
401
  SSchema        *pSchema;
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403
  pSW = metaGetTableSchema(pMeta, uid, sver, 0);
C
Cary Xu 已提交
404
  if (!pSW) return NULL;
H
fix  
Hongze Cheng 已提交
405

406
  tdInitTSchemaBuilder(&sb, pSW->version);
H
Hongze Cheng 已提交
407 408 409 410 411
  for (int i = 0; i < pSW->nCols; i++) {
    pSchema = pSW->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
C
Cary Xu 已提交
412

H
Hongze Cheng 已提交
413 414
  tdDestroyTSchemaBuilder(&sb);

H
Hongze Cheng 已提交
415 416
  taosMemoryFree(pSW->pSchema);
  taosMemoryFree(pSW);
H
Hongze Cheng 已提交
417 418 419
  return pTSchema;
}

H
Hongze Cheng 已提交
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
  int32_t   code = 0;
  STSchema *pTSchema = NULL;
  SSkmDbKey skmDbKey = {.uid = suid ? suid : uid, .sver = sver};
  void     *pData = NULL;
  int       nData = 0;

  // query
  metaRLock(pMeta);
  if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), &pData, &nData) < 0) {
    code = TSDB_CODE_NOT_FOUND;
    metaULock(pMeta);
    goto _err;
  }
  metaULock(pMeta);

  // decode
  SDecoder        dc = {0};
  SSchemaWrapper  schema;
  SSchemaWrapper *pSchemaWrapper = &schema;

  tDecoderInit(&dc, pData, nData);
  tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
  tDecoderClear(&dc);
444
  tdbFree(pData);
H
Hongze Cheng 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465

  // convert
  STSchemaBuilder sb = {0};

  tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
  for (int i = 0; i < pSchemaWrapper->nCols; i++) {
    SSchema *pSchema = pSchemaWrapper->pSchema + i;
    tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
  }
  pTSchema = tdGetSchemaFromBuilder(&sb);
  tdDestroyTSchemaBuilder(&sb);

  *ppTSchema = pTSchema;
  taosMemoryFree(pSchemaWrapper->pSchema);
  return code;

_err:
  *ppTSchema = NULL;
  return code;
}

466 467
// N.B. Called by statusReq per second
int64_t metaGetTbNum(SMeta *pMeta) {
C
Cary Xu 已提交
468
  // TODO
469 470 471 472 473 474 475
  return 100;
}

// N.B. Called by statusReq per second
int64_t metaGetTimeSeriesNum(SMeta *pMeta) { 
  // TODO
  return 400; 
C
Cary Xu 已提交
476
}
H
Hongze Cheng 已提交
477

C
Cary Xu 已提交
478
typedef struct {
H
Hongze Cheng 已提交
479 480
  SMeta   *pMeta;
  TBC     *pCur;
C
Cary Xu 已提交
481
  tb_uid_t uid;
H
Hongze Cheng 已提交
482 483
  void    *pKey;
  void    *pVal;
C
Cary Xu 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496
  int      kLen;
  int      vLen;
} SMSmaCursor;

SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
  SMSmaCursor *pSmaCur = NULL;
  SSmaIdxKey   smaIdxKey;
  int          ret;
  int          c;

  pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
  if (pSmaCur == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
497 498 499
    return NULL;
  }

C
Cary Xu 已提交
500 501 502 503
  pSmaCur->pMeta = pMeta;
  pSmaCur->uid = uid;
  metaRLock(pMeta);

H
Hongze Cheng 已提交
504
  ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
C
Cary Xu 已提交
505 506 507
  if (ret < 0) {
    metaULock(pMeta);
    taosMemoryFree(pSmaCur);
H
Hongze Cheng 已提交
508 509 510
    return NULL;
  }

C
Cary Xu 已提交
511 512 513
  // move to the suid
  smaIdxKey.uid = uid;
  smaIdxKey.smaUid = INT64_MIN;
H
Hongze Cheng 已提交
514
  tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
C
Cary Xu 已提交
515
  if (c > 0) {
H
Hongze Cheng 已提交
516
    tdbTbcMoveToNext(pSmaCur->pCur);
C
Cary Xu 已提交
517
  }
H
Hongze Cheng 已提交
518

C
Cary Xu 已提交
519 520
  return pSmaCur;
}
H
Hongze Cheng 已提交
521

C
Cary Xu 已提交
522 523 524 525
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
  if (pSmaCur) {
    if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
    if (pSmaCur->pCur) {
H
Hongze Cheng 已提交
526
      tdbTbcClose(pSmaCur->pCur);
H
Hongze Cheng 已提交
527

C
Cary Xu 已提交
528 529 530
      tdbFree(pSmaCur->pKey);
      tdbFree(pSmaCur->pVal);
    }
H
Hongze Cheng 已提交
531

C
Cary Xu 已提交
532 533 534 535 536 537 538 539
    taosMemoryFree(pSmaCur);
  }
}

tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
  int         ret;
  SSmaIdxKey *pSmaIdxKey;

H
Hongze Cheng 已提交
540
  ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
C
Cary Xu 已提交
541 542 543 544 545 546 547 548 549 550 551 552 553 554
  if (ret < 0) {
    return 0;
  }

  pSmaIdxKey = pSmaCur->pKey;
  if (pSmaIdxKey->uid > pSmaCur->uid) {
    return 0;
  }

  return pSmaIdxKey->uid;
}

STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
  STSmaWrapper *pSW = NULL;
H
Hongze Cheng 已提交
555
  SArray       *pSmaIds = NULL;
C
Cary Xu 已提交
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578

  if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
    return NULL;
  }

  pSW = taosMemoryCalloc(1, sizeof(*pSW));
  if (!pSW) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pSW->number = taosArrayGetSize(pSmaIds);
  pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));

  if (!pSW->tSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  int64_t smaId;
  int     smaIdx = 0;
H
Hongze Cheng 已提交
579
  STSma  *pTSma = NULL;
C
Cary Xu 已提交
580 581 582
  for (int i = 0; i < pSW->number; ++i) {
    smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
    if (metaGetTableEntryByUid(&mr, smaId) < 0) {
S
Shengliang Guan 已提交
583
      metaWarn("vgId:%d, no entry for tbId: %" PRIi64 ", smaId: %" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
C
Cary Xu 已提交
584 585 586 587 588 589 590 591 592 593
      continue;
    }
    pTSma = pSW->tSma + smaIdx;
    memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
    if (deepCopy) {
      if (pTSma->exprLen > 0) {
        if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
594
        memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
H
Hongze Cheng 已提交
595
      }
C
Cary Xu 已提交
596 597 598 599 600
      if (pTSma->tagsFilterLen > 0) {
        if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
H
Hongze Cheng 已提交
601
      }
H
Hongze Cheng 已提交
602
      memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
C
Cary Xu 已提交
603 604 605 606 607
    } else {
      pTSma->exprLen = 0;
      pTSma->expr = NULL;
      pTSma->tagsFilterLen = 0;
      pTSma->tagsFilter = NULL;
H
Hongze Cheng 已提交
608
    }
H
Hongze Cheng 已提交
609

C
Cary Xu 已提交
610
    ++smaIdx;
H
Hongze Cheng 已提交
611 612
  }

C
Cary Xu 已提交
613 614
  if (smaIdx <= 0) goto _err;
  pSW->number = smaIdx;
H
Hongze Cheng 已提交
615

C
Cary Xu 已提交
616 617
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
H
Hongze Cheng 已提交
618
  return pSW;
C
Cary Xu 已提交
619 620 621
_err:
  metaReaderClear(&mr);
  taosArrayDestroy(pSmaIds);
C
Cary Xu 已提交
622
  tFreeTSmaWrapper(pSW, deepCopy);
H
Hongze Cheng 已提交
623 624 625
  return NULL;
}

C
Cary Xu 已提交
626
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
H
Hongze Cheng 已提交
627
  STSma      *pTSma = NULL;
C
Cary Xu 已提交
628 629 630
  SMetaReader mr = {0};
  metaReaderInit(&mr, pMeta, 0);
  if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
S
Shengliang Guan 已提交
631
    metaWarn("vgId:%d, failed to get table entry for smaId: %" PRIi64, TD_VID(pMeta->pVnode), indexUid);
C
Cary Xu 已提交
632 633 634 635 636 637 638 639 640 641 642 643 644 645
    metaReaderClear(&mr);
    return NULL;
  }
  pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
  if (!pTSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    metaReaderClear(&mr);
    return NULL;
  }

  memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));

  metaReaderClear(&mr);
  return pTSma;
H
Hongze Cheng 已提交
646 647
}

C
Cary Xu 已提交
648
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
H
Hongze Cheng 已提交
649
  SArray     *pUids = NULL;
C
Cary Xu 已提交
650
  SSmaIdxKey *pSmaIdxKey = NULL;
H
Hongze Cheng 已提交
651

C
Cary Xu 已提交
652 653
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
  if (!pCur) {
H
Hongze Cheng 已提交
654 655 656
    return NULL;
  }

C
Cary Xu 已提交
657 658 659 660 661
  while (1) {
    tb_uid_t id = metaSmaCursorNext(pCur);
    if (id == 0) {
      break;
    }
H
Hongze Cheng 已提交
662

C
Cary Xu 已提交
663 664
    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
H
Hongze Cheng 已提交
665
      if (!pUids) {
C
Cary Xu 已提交
666 667 668
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
H
Hongze Cheng 已提交
669
      }
C
Cary Xu 已提交
670
    }
H
Hongze Cheng 已提交
671

C
Cary Xu 已提交
672
    pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
H
Hongze Cheng 已提交
673

C
Cary Xu 已提交
674 675 676 677 678
    if (taosArrayPush(pUids, &pSmaIdxKey->smaUid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
H
Hongze Cheng 已提交
679 680 681 682 683 684 685
    }
  }

  metaCloseSmaCursor(pCur);
  return pUids;
}

C
Cary Xu 已提交
686
SArray *metaGetSmaTbUids(SMeta *pMeta) {
H
Hongze Cheng 已提交
687
  SArray     *pUids = NULL;
C
Cary Xu 已提交
688 689 690 691 692
  SSmaIdxKey *pSmaIdxKey = NULL;
  tb_uid_t    lastUid = 0;

  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
  if (!pCur) {
H
Hongze Cheng 已提交
693 694 695
    return NULL;
  }

C
Cary Xu 已提交
696 697 698 699 700
  while (1) {
    tb_uid_t uid = metaSmaCursorNext(pCur);
    if (uid == 0) {
      break;
    }
H
Hongze Cheng 已提交
701

C
Cary Xu 已提交
702 703 704
    if (lastUid == uid) {
      continue;
    }
H
Hongze Cheng 已提交
705

C
Cary Xu 已提交
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
    lastUid = uid;

    if (!pUids) {
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
      if (!pUids) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        metaCloseSmaCursor(pCur);
        return NULL;
      }
    }

    if (taosArrayPush(pUids, &uid) < 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      metaCloseSmaCursor(pCur);
      taosArrayDestroy(pUids);
      return NULL;
    }
H
Hongze Cheng 已提交
723 724
  }

C
Cary Xu 已提交
725 726
  metaCloseSmaCursor(pCur);
  return pUids;
H
Hongze Cheng 已提交
727 728
}

L
Liu Jicong 已提交
729
#endif
H
Hongze Cheng 已提交
730

wmmhello's avatar
wmmhello 已提交
731
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
H
Hongze Cheng 已提交
732
  ASSERT(pEntry->type == TSDB_CHILD_TABLE);
wmmhello's avatar
wmmhello 已提交
733
  STag *tag = (STag *)pEntry->ctbEntry.pTags;
dengyihao's avatar
dengyihao 已提交
734
  if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
735 736
    return tag;
  }
wmmhello's avatar
wmmhello 已提交
737 738
  bool find = tTagGet(tag, val);

dengyihao's avatar
dengyihao 已提交
739
  if (!find) {
wmmhello's avatar
wmmhello 已提交
740 741 742
    return NULL;
  }
  return val;
dengyihao's avatar
dengyihao 已提交
743
}
wmmhello's avatar
wmmhello 已提交
744 745

typedef struct {
H
Hongze Cheng 已提交
746 747
  SMeta   *pMeta;
  TBC     *pCur;
wmmhello's avatar
wmmhello 已提交
748 749 750
  tb_uid_t suid;
  int16_t  cid;
  int16_t  type;
H
Hongze Cheng 已提交
751 752
  void    *pKey;
  void    *pVal;
wmmhello's avatar
wmmhello 已提交
753 754 755 756 757 758
  int32_t  kLen;
  int32_t  vLen;
} SIdxCursor;

int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
  SIdxCursor *pCursor = NULL;
H
Hongze Cheng 已提交
759
  char       *buf = NULL;
dengyihao's avatar
dengyihao 已提交
760
  int32_t     maxSize = 0;
wmmhello's avatar
wmmhello 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777

  int32_t ret = 0, valid = 0;
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
  pCursor->pMeta = pMeta;
  pCursor->suid = param->suid;
  pCursor->cid = param->cid;
  pCursor->type = param->type;

  metaRLock(pMeta);
  ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
  if (ret < 0) {
    goto END;
  }
  STagIdxKey *pKey = NULL;
  int32_t     nKey = 0;

  int32_t nTagData = 0;
H
Hongze Cheng 已提交
778
  void   *tagData = NULL;
dengyihao's avatar
dengyihao 已提交
779

dengyihao's avatar
dengyihao 已提交
780
  if (param->val == NULL) {
S
Shengliang Guan 已提交
781
    metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
dengyihao's avatar
dengyihao 已提交
782
    return -1;
dengyihao's avatar
dengyihao 已提交
783
  } else {
dengyihao's avatar
dengyihao 已提交
784 785 786 787 788 789 790 791 792 793
    if (IS_VAR_DATA_TYPE(param->type)) {
      tagData = varDataVal(param->val);
      nTagData = varDataLen(param->val);

      if (param->type == TSDB_DATA_TYPE_NCHAR) {
        maxSize = 4 * nTagData + 1;
        buf = taosMemoryCalloc(1, maxSize);
        if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
          goto END;
        }
dengyihao's avatar
dengyihao 已提交
794

dengyihao's avatar
dengyihao 已提交
795 796 797 798 799 800
        tagData = buf;
        nTagData = maxSize;
      }
    } else {
      tagData = param->val;
      nTagData = tDataTypes[param->type].bytes;
dengyihao's avatar
dengyihao 已提交
801
    }
wmmhello's avatar
wmmhello 已提交
802
  }
dengyihao's avatar
dengyihao 已提交
803
  ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
wmmhello's avatar
wmmhello 已提交
804
                            param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
dengyihao's avatar
dengyihao 已提交
805

wmmhello's avatar
wmmhello 已提交
806 807 808 809 810 811 812
  if (ret != 0) {
    goto END;
  }
  int cmp = 0;
  if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
813

H
Hongze Cheng 已提交
814
  void   *entryKey = NULL, *entryVal = NULL;
wmmhello's avatar
wmmhello 已提交
815
  int32_t nEntryKey, nEntryVal;
dengyihao's avatar
dengyihao 已提交
816
  bool    first = true;
wmmhello's avatar
wmmhello 已提交
817 818 819 820 821 822
  while (1) {
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
    if (valid < 0) {
      break;
    }
    STagIdxKey *p = entryKey;
dengyihao's avatar
dengyihao 已提交
823 824 825 826 827 828 829 830 831 832
    if (p->type != pCursor->type) {
      if (first) {
        valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
        if (valid < 0) break;
        continue;
      } else {
        break;
      }
    }
    first = false;
wmmhello's avatar
wmmhello 已提交
833 834 835 836
    if (p != NULL) {
      int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
      if (cmp == 0) {
        // match
dengyihao's avatar
dengyihao 已提交
837 838 839 840 841 842
        tb_uid_t tuid = 0;
        if (IS_VAR_DATA_TYPE(pKey->type)) {
          tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
        } else {
          tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
        }
wmmhello's avatar
wmmhello 已提交
843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858
        taosArrayPush(pUids, &tuid);
      } else if (cmp == 1) {
        // not match but should continue to iter
      } else {
        // not match and no more result
        break;
      }
    }
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
    if (valid < 0) {
      break;
    }
  }
END:
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
dengyihao's avatar
dengyihao 已提交
859
  taosMemoryFree(buf);
wmmhello's avatar
wmmhello 已提交
860 861 862 863

  taosMemoryFree(pCursor);

  return ret;
dengyihao's avatar
dengyihao 已提交
864
}