metaSnapshot.c 18.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "meta.h"

H
Hongze Cheng 已提交
18
// SMetaSnapReader ========================================
H
Hongze Cheng 已提交
19
struct SMetaSnapReader {
H
Hongze Cheng 已提交
20 21 22
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
23
  TBC*    pTbc;
H
Hongze Cheng 已提交
24 25
};

H
Hongze Cheng 已提交
26
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
27 28
  int32_t          code = 0;
  int32_t          c = 0;
H
Hongze Cheng 已提交
29
  SMetaSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31
  // alloc
H
Hongze Cheng 已提交
32 33
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
H
Hongze Cheng 已提交
34 35 36
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
37 38 39
  pReader->pMeta = pMeta;
  pReader->sver = sver;
  pReader->ever = ever;
H
Hongze Cheng 已提交
40 41

  // impl
H
Hongze Cheng 已提交
42
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
H
Hongze Cheng 已提交
43
  if (code) {
H
Hongze Cheng 已提交
44
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
45 46 47
    goto _err;
  }

H
Hongze Cheng 已提交
48
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
49
  if (code) {
H
Hongze Cheng 已提交
50
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
51 52 53
    goto _err;
  }

S
Shengliang Guan 已提交
54
  metaInfo("vgId:%d, vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
H
Hongze Cheng 已提交
55 56

  *ppReader = pReader;
H
Hongze Cheng 已提交
57 58 59
  return code;

_err:
S
Shengliang Guan 已提交
60
  metaError("vgId:%d, vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
61 62
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
63 64
}

H
Hongze Cheng 已提交
65
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
66 67
  int32_t code = 0;

H
Hongze Cheng 已提交
68 69 70
  tdbTbcClose((*ppReader)->pTbc);
  taosMemoryFree(*ppReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
71 72

  return code;
H
Hongze Cheng 已提交
73 74
}

H
Hongze Cheng 已提交
75
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
H
Hongze Cheng 已提交
76
  int32_t     code = 0;
H
Hongze Cheng 已提交
77 78 79 80
  const void* pKey = NULL;
  const void* pData = NULL;
  int32_t     nKey = 0;
  int32_t     nData = 0;
H
Hongze Cheng 已提交
81
  STbDbKey    key;
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83
  *ppData = NULL;
H
Hongze Cheng 已提交
84
  for (;;) {
H
Hongze Cheng 已提交
85
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
H
Hongze Cheng 已提交
86
      goto _exit;
H
Hongze Cheng 已提交
87 88
    }

H
Hongze Cheng 已提交
89 90 91 92 93 94
    key = ((STbDbKey*)pKey)[0];
    if (key.version > pReader->ever) {
      goto _exit;
    }

    if (key.version < pReader->sver) {
H
Hongze Cheng 已提交
95
      tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
96 97 98
      continue;
    }

H
Hongze Cheng 已提交
99
    tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
100 101 102
    break;
  }

H
Hongze Cheng 已提交
103 104 105 106
  ASSERT(pData && nData);

  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
  if (*ppData == NULL) {
H
Hongze Cheng 已提交
107
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
108
    goto _err;
H
Hongze Cheng 已提交
109
  }
H
Hongze Cheng 已提交
110 111

  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
W
wenzhouwww 已提交
112
  pHdr->type = SNAP_DATA_META;
H
Hongze Cheng 已提交
113 114 115
  pHdr->size = nData;
  memcpy(pHdr->data, pData, nData);

S
Shengliang Guan 已提交
116
  metaInfo("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
H
Hongze Cheng 已提交
117
           TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
H
Hongze Cheng 已提交
118 119

_exit:
H
Hongze Cheng 已提交
120
  return code;
H
Hongze Cheng 已提交
121 122

_err:
S
Shengliang Guan 已提交
123
  metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
124
  return code;
H
Hongze Cheng 已提交
125 126
}

H
Hongze Cheng 已提交
127 128 129 130 131 132 133
// SMetaSnapWriter ========================================
struct SMetaSnapWriter {
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
};

H
Hongze Cheng 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter;

  // alloc
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->pMeta = pMeta;
  pWriter->sver = sver;
  pWriter->ever = ever;

148
  metaBegin(pMeta, META_BEGIN_HEAP_NIL);
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150 151 152 153
  *ppWriter = pWriter;
  return code;

_err:
S
Shengliang Guan 已提交
154
  metaError("vgId:%d, meta snapshot writer open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162 163
  *ppWriter = NULL;
  return code;
}

int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter = *ppWriter;

  if (rollback) {
164 165
    code = metaAbort(pWriter->pMeta);
    if (code) goto _err;
H
Hongze Cheng 已提交
166
  } else {
167
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
H
Hongze Cheng 已提交
168
    if (code) goto _err;
169
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
170
    if (code) goto _err;
H
Hongze Cheng 已提交
171 172 173 174 175 176 177
  }
  taosMemoryFree(pWriter);
  *ppWriter = NULL;

  return code;

_err:
S
Shengliang Guan 已提交
178
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
179 180 181
  return code;
}

H
Hongze Cheng 已提交
182
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
Hongze Cheng 已提交
183 184 185 186 187
  int32_t    code = 0;
  SMeta*     pMeta = pWriter->pMeta;
  SMetaEntry metaEntry = {0};
  SDecoder*  pDecoder = &(SDecoder){0};

H
Hongze Cheng 已提交
188
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
H
Hongze Cheng 已提交
189 190 191 192 193
  metaDecodeEntry(pDecoder, &metaEntry);

  code = metaHandleEntry(pMeta, &metaEntry);
  if (code) goto _err;

H
Hongze Cheng 已提交
194
  tDecoderClear(pDecoder);
H
Hongze Cheng 已提交
195 196 197
  return code;

_err:
S
Shengliang Guan 已提交
198
  metaError("vgId:%d, vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
199
  return code;
W
wenzhouwww 已提交
200
}
201

202 203 204 205 206
typedef struct STableInfoForChildTable {
  char*           tableName;
  SSchemaWrapper* schemaRow;
  SSchemaWrapper* tagRow;
} STableInfoForChildTable;
207 208 209

static void destroySTableInfoForChildTable(void* data) {
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
wmmhello's avatar
wmmhello 已提交
210
  taosMemoryFree(pData->tableName);
211
  tDeleteSSchemaWrapper(pData->schemaRow);
wmmhello's avatar
wmmhello 已提交
212
  tDeleteSSchemaWrapper(pData->tagRow);
213 214
}

215
static void MoveToSnapShotVersion(SSnapContext* ctx) {
wmmhello's avatar
wmmhello 已提交
216 217 218
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
219
  int      c = 0;
wmmhello's avatar
wmmhello 已提交
220
  tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
221
  if (c < 0) {
wmmhello's avatar
wmmhello 已提交
222 223 224 225
    tdbTbcMoveToPrev(ctx->pCur);
  }
}

226
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
wmmhello's avatar
wmmhello 已提交
227 228 229
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  STbDbKey key = {.version = ver, .uid = uid};
230
  int      c = 0;
wmmhello's avatar
wmmhello 已提交
231
  tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
232
  return c;
wmmhello's avatar
wmmhello 已提交
233 234
}

235
static void MoveToFirst(SSnapContext* ctx) {
wmmhello's avatar
wmmhello 已提交
236 237 238 239 240
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  tdbTbcMoveToFirst(ctx->pCur);
}

241
static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
wmmhello's avatar
wmmhello 已提交
242
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
243
  if (data) {
wmmhello's avatar
wmmhello 已提交
244 245 246 247
    return;
  }
  STableInfoForChildTable dataTmp = {0};
  dataTmp.tableName = strdup(me->name);
wmmhello's avatar
wmmhello 已提交
248

wmmhello's avatar
wmmhello 已提交
249 250 251 252 253
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
  taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}

254 255
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
                         SSnapContext** ctxRet) {
wmmhello's avatar
wmmhello 已提交
256
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
257
  if (ctx == NULL) return -1;
wmmhello's avatar
wmmhello 已提交
258
  *ctxRet = ctx;
259 260 261 262 263
  ctx->pMeta = pMeta;
  ctx->snapVersion = snapVersion;
  ctx->suid = suid;
  ctx->subType = subType;
  ctx->queryMetaOrData = withMeta;
wmmhello's avatar
wmmhello 已提交
264
  ctx->withMeta = withMeta;
265
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
266
  if (ctx->idVersion == NULL) {
267 268 269 270
    return -1;
  }

  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
271
  if (ctx->suidInfo == NULL) {
272 273 274 275
    return -1;
  }
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);

wmmhello's avatar
wmmhello 已提交
276 277
  ctx->index = 0;
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
278 279
  void* pKey = NULL;
  void* pVal = NULL;
wmmhello's avatar
wmmhello 已提交
280
  int   vLen = 0, kLen = 0;
281

wmmhello's avatar
wmmhello 已提交
282 283
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
  MoveToFirst(ctx);
284
  while (1) {
wmmhello's avatar
wmmhello 已提交
285 286
    int32_t ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
    if (ret < 0) break;
287
    STbDbKey* tmp = (STbDbKey*)pKey;
wmmhello's avatar
wmmhello 已提交
288 289
    if (tmp->version > ctx->snapVersion) break;

290
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
291
    if (idData) {
292 293 294
      continue;
    }

295 296
    if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
        0) {  // check if table exist for now, need optimize later
297 298 299
      continue;
    }

wmmhello's avatar
wmmhello 已提交
300 301 302 303
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
304
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
305
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
306
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
307
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
308 309 310
        continue;
      }
    }
311 312 313 314 315 316 317

    taosArrayPush(ctx->idList, &tmp->uid);
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
    SIdInfo info = {0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));

    tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
318 319 320 321
  }
  taosHashClear(ctx->idVersion);

  MoveToSnapShotVersion(ctx);
322
  while (1) {
wmmhello's avatar
wmmhello 已提交
323
    int32_t ret = tdbTbcPrev(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
324 325
    if (ret < 0) break;

326 327 328
    STbDbKey* tmp = (STbDbKey*)pKey;
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
    if (idData) {
329
      continue;
wmmhello's avatar
wmmhello 已提交
330
    }
331 332
    SIdInfo info = {.version = tmp->version, .index = 0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
wmmhello's avatar
wmmhello 已提交
333 334 335 336 337

    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
338
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
339
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
340
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
341
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
342 343 344 345
        continue;
      }
    }

346 347
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
348 349 350 351 352
      saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
    }
    tDecoderClear(&dc);
  }

353 354
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
    int64_t* uid = taosArrayGet(ctx->idList, i);
wmmhello's avatar
wmmhello 已提交
355 356 357
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
    ASSERT(idData);
    idData->index = i;
358 359
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
              idData->index);
360
  }
wmmhello's avatar
wmmhello 已提交
361

362 363
  tdbFree(pKey);
  tdbFree(pVal);
364 365 366
  return TDB_CODE_SUCCESS;
}

367
int32_t destroySnapContext(SSnapContext* ctx) {
368
  tdbTbcClose(ctx->pCur);
wmmhello's avatar
wmmhello 已提交
369
  taosArrayDestroy(ctx->idList);
370 371
  taosHashCleanup(ctx->idVersion);
  taosHashCleanup(ctx->suidInfo);
wmmhello's avatar
wmmhello 已提交
372
  taosMemoryFree(ctx);
373 374 375
  return 0;
}

376 377
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
  int32_t            ret = 0;
wmmhello's avatar
wmmhello 已提交
378
  SVCreateTbBatchReq reqs = {0};
379 380

  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
381
  if (NULL == reqs.pArray) {
382 383 384
    ret = -1;
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
385
  taosArrayPush(reqs.pArray, req);
386 387 388
  reqs.nReqs = 1;

  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
389
  if (ret < 0) {
390 391 392 393 394 395 396 397 398 399
    ret = -1;
    goto end;
  }
  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    ret = -1;
    goto end;
  }
  SEncoder coder = {0};
wmmhello's avatar
wmmhello 已提交
400
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
401 402 403 404 405 406 407 408 409 410 411 412 413
  if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&coder);
    ret = -1;
    goto end;
  }
  tEncoderClear(&coder);

end:
  taosArrayDestroy(reqs.pArray);
  return ret;
}

414
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
415 416 417 418 419 420 421 422 423 424 425 426
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
  if (ret < 0) {
    return -1;
  }

  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    return -1;
  }

427
  SEncoder encoder = {0};
wmmhello's avatar
wmmhello 已提交
428
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
429 430 431 432 433 434 435 436 437
  if (tEncodeSVCreateStbReq(&encoder, req) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&encoder);
    return -1;
  }
  tEncoderClear(&encoder);
  return 0;
}

438
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
439
  int c = 0;
wmmhello's avatar
wmmhello 已提交
440

441
  if (uid == 0) {
wmmhello's avatar
wmmhello 已提交
442
    ctx->index = 0;
443 444 445
    return c;
  }

wmmhello's avatar
wmmhello 已提交
446
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
447
  if (!idInfo) {
448 449 450
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
451
  ctx->index = idInfo->index;
452 453 454 455

  return c;
}

456
int32_t getMetafromSnapShot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
457
  int32_t ret = 0;
458 459 460
  void*   pKey = NULL;
  void*   pVal = NULL;
  int     vLen = 0, kLen = 0;
461

462 463
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
464 465
      metaDebug("tmqsnap get meta end");
      ctx->index = 0;
466
      ctx->queryMetaOrData = false;  // change to get data
467 468
      return 0;
    }
469

470 471 472 473 474 475 476
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
    ASSERT(idInfo);

    *uid = *uidTmp;
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
477
    if (ret == 0) {
478 479 480 481
      break;
    }
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
  }
wmmhello's avatar
wmmhello 已提交
482 483 484 485 486 487

  tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
  SDecoder   dc = {0};
  SMetaEntry me = {0};
  tDecoderInit(&dc, pVal, vLen);
  metaDecodeEntry(&dc, &me);
488
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
wmmhello's avatar
wmmhello 已提交
489

490 491
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
492 493 494 495 496 497 498 499 500 501 502
    SVCreateStbReq req = {0};
    req.name = me.name;
    req.suid = me.uid;
    req.schemaRow = me.stbEntry.schemaRow;
    req.schemaTag = me.stbEntry.schemaTag;
    req.schemaRow.version = 1;
    req.schemaTag.version = 1;

    ret = buildSuperTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_STB;

503 504 505 506
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
    STableInfoForChildTable* data =
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
wmmhello's avatar
wmmhello 已提交
507 508 509
    ASSERT(data);
    SVCreateTbReq req = {0};

wmmhello's avatar
wmmhello 已提交
510
    req.type = TSDB_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
511 512 513 514
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ctb.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
515
    req.ctb.tagNum = data->tagRow->nCols;
516
    req.ctb.stbName = data->tableName;
wmmhello's avatar
wmmhello 已提交
517

wmmhello's avatar
wmmhello 已提交
518
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
519 520
    STag*   p = (STag*)me.ctbEntry.pTags;
    if (tTagIsJson(p)) {
wmmhello's avatar
wmmhello 已提交
521 522 523 524
      if (p->nTag != 0) {
        SSchema* schema = &data->tagRow->pSchema[0];
        taosArrayPush(tagName, schema->name);
      }
525
    } else {
wmmhello's avatar
wmmhello 已提交
526 527 528 529 530 531 532
      SArray* pTagVals = NULL;
      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
        ASSERT(0);
      }
      int16_t nCols = taosArrayGetSize(pTagVals);
      for (int j = 0; j < nCols; ++j) {
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
533 534 535
        for (int i = 0; i < data->tagRow->nCols; i++) {
          SSchema* schema = &data->tagRow->pSchema[i];
          if (schema->colId == pTagVal->cid) {
wmmhello's avatar
wmmhello 已提交
536 537 538 539
            taosArrayPush(tagName, schema->name);
          }
        }
      }
540
      taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
541
    }
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556
    //    SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
    //    if(sidInfo->version >= idInfo->version){
    //      // need parse tag
    //      STag* p = (STag*)me.ctbEntry.pTags;
    //      SArray* pTagVals = NULL;
    //      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
    //      }
    //
    //      int16_t nCols = taosArrayGetSize(pTagVals);
    //      for (int j = 0; j < nCols; ++j) {
    //        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
    //      }
    //    }else{
    req.ctb.pTag = me.ctbEntry.pTags;
    //    }
wmmhello's avatar
wmmhello 已提交
557

wmmhello's avatar
wmmhello 已提交
558
    req.ctb.tagName = tagName;
wmmhello's avatar
wmmhello 已提交
559 560
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
wmmhello's avatar
wmmhello 已提交
561
    taosArrayDestroy(tagName);
562
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
563
    SVCreateTbReq req = {0};
wmmhello's avatar
wmmhello 已提交
564
    req.type = TSDB_NORMAL_TABLE;
wmmhello's avatar
wmmhello 已提交
565 566 567 568 569 570
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
571
  } else {
wmmhello's avatar
wmmhello 已提交
572
    ASSERT(0);
573
  }
wmmhello's avatar
wmmhello 已提交
574
  tDecoderClear(&dc);
575 576 577 578

  return ret;
}

579
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx) {
580
  SMetaTableInfo result = {0};
581 582 583
  void*          pKey = NULL;
  void*          pVal = NULL;
  int            vLen, kLen;
584

585 586
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
wmmhello's avatar
wmmhello 已提交
587
      metaDebug("tmqsnap get uid info end");
588 589
      return result;
    }
wmmhello's avatar
wmmhello 已提交
590 591 592 593
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
    ASSERT(idInfo);
594

595
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
596
    if (ret != 0) {
597 598 599
      metaDebug("tmqsnap getUidfromSnapShot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version);
      continue;
    }
wmmhello's avatar
wmmhello 已提交
600
    tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
601 602 603 604
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
605
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
606

607 608 609
    if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
610 611
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
612 613
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
      strcpy(result.tbName, me.name);
614 615 616 617 618
      tDecoderClear(&dc);
      break;
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
      result.uid = me.uid;
      result.suid = 0;
wmmhello's avatar
wmmhello 已提交
619 620
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
621 622
      tDecoderClear(&dc);
      break;
623 624 625
    } else if (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
626 627
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
628 629
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
630 631
      tDecoderClear(&dc);
      break;
632
    } else {
wmmhello's avatar
wmmhello 已提交
633
      metaDebug("tmqsnap get uid continue");
634 635 636 637 638 639 640
      tDecoderClear(&dc);
      continue;
    }
  }

  return result;
}