metaSnapshot.c 18.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "meta.h"

H
Hongze Cheng 已提交
18
// SMetaSnapReader ========================================
H
Hongze Cheng 已提交
19
struct SMetaSnapReader {
H
Hongze Cheng 已提交
20 21 22
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
23
  TBC*    pTbc;
H
Hongze Cheng 已提交
24 25
};

H
Hongze Cheng 已提交
26
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
27 28
  int32_t          code = 0;
  int32_t          c = 0;
H
Hongze Cheng 已提交
29
  SMetaSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31
  // alloc
H
Hongze Cheng 已提交
32 33
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
H
Hongze Cheng 已提交
34 35 36
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
37 38 39
  pReader->pMeta = pMeta;
  pReader->sver = sver;
  pReader->ever = ever;
H
Hongze Cheng 已提交
40 41

  // impl
H
Hongze Cheng 已提交
42
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
H
Hongze Cheng 已提交
43
  if (code) {
H
Hongze Cheng 已提交
44
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
45 46 47
    goto _err;
  }

H
Hongze Cheng 已提交
48
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
49
  if (code) {
H
Hongze Cheng 已提交
50
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
51 52 53
    goto _err;
  }

S
Shengliang Guan 已提交
54
  metaInfo("vgId:%d, vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
H
Hongze Cheng 已提交
55 56

  *ppReader = pReader;
H
Hongze Cheng 已提交
57 58 59
  return code;

_err:
S
Shengliang Guan 已提交
60
  metaError("vgId:%d, vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
61 62
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
63 64
}

H
Hongze Cheng 已提交
65
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
66 67
  int32_t code = 0;

H
Hongze Cheng 已提交
68 69 70
  tdbTbcClose((*ppReader)->pTbc);
  taosMemoryFree(*ppReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
71 72

  return code;
H
Hongze Cheng 已提交
73 74
}

H
Hongze Cheng 已提交
75
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
H
Hongze Cheng 已提交
76
  int32_t     code = 0;
H
Hongze Cheng 已提交
77 78 79 80
  const void* pKey = NULL;
  const void* pData = NULL;
  int32_t     nKey = 0;
  int32_t     nData = 0;
H
Hongze Cheng 已提交
81
  STbDbKey    key;
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83
  *ppData = NULL;
H
Hongze Cheng 已提交
84
  for (;;) {
H
Hongze Cheng 已提交
85
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
H
Hongze Cheng 已提交
86
      goto _exit;
H
Hongze Cheng 已提交
87 88
    }

H
Hongze Cheng 已提交
89 90 91 92 93 94
    key = ((STbDbKey*)pKey)[0];
    if (key.version > pReader->ever) {
      goto _exit;
    }

    if (key.version < pReader->sver) {
H
Hongze Cheng 已提交
95
      tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
96 97 98
      continue;
    }

H
Hongze Cheng 已提交
99
    tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
100 101 102
    break;
  }

H
Hongze Cheng 已提交
103 104 105 106
  ASSERT(pData && nData);

  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
  if (*ppData == NULL) {
H
Hongze Cheng 已提交
107
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
108
    goto _err;
H
Hongze Cheng 已提交
109
  }
H
Hongze Cheng 已提交
110 111

  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
W
wenzhouwww 已提交
112
  pHdr->type = SNAP_DATA_META;
H
Hongze Cheng 已提交
113 114 115
  pHdr->size = nData;
  memcpy(pHdr->data, pData, nData);

S
Shengliang Guan 已提交
116
  metaInfo("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
H
Hongze Cheng 已提交
117
           TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
H
Hongze Cheng 已提交
118 119

_exit:
H
Hongze Cheng 已提交
120
  return code;
H
Hongze Cheng 已提交
121 122

_err:
S
Shengliang Guan 已提交
123
  metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
124
  return code;
H
Hongze Cheng 已提交
125 126
}

H
Hongze Cheng 已提交
127 128 129 130 131 132 133
// SMetaSnapWriter ========================================
struct SMetaSnapWriter {
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
};

H
Hongze Cheng 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter;

  // alloc
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->pMeta = pMeta;
  pWriter->sver = sver;
  pWriter->ever = ever;

148
  metaBegin(pMeta, META_BEGIN_HEAP_NIL);
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150 151 152 153
  *ppWriter = pWriter;
  return code;

_err:
S
Shengliang Guan 已提交
154
  metaError("vgId:%d, meta snapshot writer open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162 163
  *ppWriter = NULL;
  return code;
}

int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter = *ppWriter;

  if (rollback) {
164
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
165
    code = metaAbort(pWriter->pMeta);
166 167
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
             code);
168
    if (code) goto _err;
H
Hongze Cheng 已提交
169
  } else {
170
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
H
Hongze Cheng 已提交
171
    if (code) goto _err;
172
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
173
    if (code) goto _err;
H
Hongze Cheng 已提交
174 175 176 177 178 179 180
  }
  taosMemoryFree(pWriter);
  *ppWriter = NULL;

  return code;

_err:
S
Shengliang Guan 已提交
181
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
182 183 184
  return code;
}

H
Hongze Cheng 已提交
185
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
Hongze Cheng 已提交
186 187 188 189 190
  int32_t    code = 0;
  SMeta*     pMeta = pWriter->pMeta;
  SMetaEntry metaEntry = {0};
  SDecoder*  pDecoder = &(SDecoder){0};

H
Hongze Cheng 已提交
191
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
H
Hongze Cheng 已提交
192 193 194 195 196
  metaDecodeEntry(pDecoder, &metaEntry);

  code = metaHandleEntry(pMeta, &metaEntry);
  if (code) goto _err;

H
Hongze Cheng 已提交
197
  tDecoderClear(pDecoder);
H
Hongze Cheng 已提交
198 199 200
  return code;

_err:
S
Shengliang Guan 已提交
201
  metaError("vgId:%d, vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
202
  return code;
W
wenzhouwww 已提交
203
}
204

205 206 207 208 209
typedef struct STableInfoForChildTable {
  char*           tableName;
  SSchemaWrapper* schemaRow;
  SSchemaWrapper* tagRow;
} STableInfoForChildTable;
210 211 212

static void destroySTableInfoForChildTable(void* data) {
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
wmmhello's avatar
wmmhello 已提交
213
  taosMemoryFree(pData->tableName);
214
  tDeleteSSchemaWrapper(pData->schemaRow);
wmmhello's avatar
wmmhello 已提交
215
  tDeleteSSchemaWrapper(pData->tagRow);
216 217
}

218
static void MoveToSnapShotVersion(SSnapContext* ctx) {
wmmhello's avatar
wmmhello 已提交
219 220 221
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
222
  int      c = 0;
wmmhello's avatar
wmmhello 已提交
223
  tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
224
  if (c < 0) {
wmmhello's avatar
wmmhello 已提交
225 226 227 228
    tdbTbcMoveToPrev(ctx->pCur);
  }
}

229
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
wmmhello's avatar
wmmhello 已提交
230 231 232
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  STbDbKey key = {.version = ver, .uid = uid};
233
  int      c = 0;
wmmhello's avatar
wmmhello 已提交
234
  tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
235
  return c;
wmmhello's avatar
wmmhello 已提交
236 237
}

238
static void MoveToFirst(SSnapContext* ctx) {
wmmhello's avatar
wmmhello 已提交
239 240 241 242 243
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  tdbTbcMoveToFirst(ctx->pCur);
}

244
static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
wmmhello's avatar
wmmhello 已提交
245
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
246
  if (data) {
wmmhello's avatar
wmmhello 已提交
247 248 249 250
    return;
  }
  STableInfoForChildTable dataTmp = {0};
  dataTmp.tableName = strdup(me->name);
wmmhello's avatar
wmmhello 已提交
251

wmmhello's avatar
wmmhello 已提交
252 253 254 255 256
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
  taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}

257 258
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
                         SSnapContext** ctxRet) {
wmmhello's avatar
wmmhello 已提交
259
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
260
  if (ctx == NULL) return -1;
wmmhello's avatar
wmmhello 已提交
261
  *ctxRet = ctx;
262 263 264 265 266
  ctx->pMeta = pMeta;
  ctx->snapVersion = snapVersion;
  ctx->suid = suid;
  ctx->subType = subType;
  ctx->queryMetaOrData = withMeta;
wmmhello's avatar
wmmhello 已提交
267
  ctx->withMeta = withMeta;
268
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
269
  if (ctx->idVersion == NULL) {
270 271 272 273
    return -1;
  }

  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
274
  if (ctx->suidInfo == NULL) {
275 276 277 278
    return -1;
  }
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);

wmmhello's avatar
wmmhello 已提交
279 280
  ctx->index = 0;
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
281 282
  void* pKey = NULL;
  void* pVal = NULL;
wmmhello's avatar
wmmhello 已提交
283
  int   vLen = 0, kLen = 0;
284

wmmhello's avatar
wmmhello 已提交
285 286
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
  MoveToFirst(ctx);
287
  while (1) {
wmmhello's avatar
wmmhello 已提交
288 289
    int32_t ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
    if (ret < 0) break;
290
    STbDbKey* tmp = (STbDbKey*)pKey;
wmmhello's avatar
wmmhello 已提交
291 292
    if (tmp->version > ctx->snapVersion) break;

293
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
294
    if (idData) {
295 296 297
      continue;
    }

298 299
    if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
        0) {  // check if table exist for now, need optimize later
300 301 302
      continue;
    }

wmmhello's avatar
wmmhello 已提交
303 304 305 306
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
307
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
308
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
309
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
310
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
311 312 313
        continue;
      }
    }
314 315 316 317 318 319 320

    taosArrayPush(ctx->idList, &tmp->uid);
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
    SIdInfo info = {0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));

    tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
321 322 323 324
  }
  taosHashClear(ctx->idVersion);

  MoveToSnapShotVersion(ctx);
325
  while (1) {
wmmhello's avatar
wmmhello 已提交
326
    int32_t ret = tdbTbcPrev(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
327 328
    if (ret < 0) break;

329 330 331
    STbDbKey* tmp = (STbDbKey*)pKey;
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
    if (idData) {
332
      continue;
wmmhello's avatar
wmmhello 已提交
333
    }
334 335
    SIdInfo info = {.version = tmp->version, .index = 0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
wmmhello's avatar
wmmhello 已提交
336 337 338 339 340

    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
341
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
342
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
343
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
344
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
345 346 347 348
        continue;
      }
    }

349 350
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
351 352 353 354 355
      saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
    }
    tDecoderClear(&dc);
  }

356 357
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
    int64_t* uid = taosArrayGet(ctx->idList, i);
wmmhello's avatar
wmmhello 已提交
358 359 360
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
    ASSERT(idData);
    idData->index = i;
361 362
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
              idData->index);
363
  }
wmmhello's avatar
wmmhello 已提交
364

365 366
  tdbFree(pKey);
  tdbFree(pVal);
367 368 369
  return TDB_CODE_SUCCESS;
}

370
int32_t destroySnapContext(SSnapContext* ctx) {
371
  tdbTbcClose(ctx->pCur);
wmmhello's avatar
wmmhello 已提交
372
  taosArrayDestroy(ctx->idList);
373 374
  taosHashCleanup(ctx->idVersion);
  taosHashCleanup(ctx->suidInfo);
wmmhello's avatar
wmmhello 已提交
375
  taosMemoryFree(ctx);
376 377 378
  return 0;
}

379 380
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
  int32_t            ret = 0;
wmmhello's avatar
wmmhello 已提交
381
  SVCreateTbBatchReq reqs = {0};
382 383

  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
384
  if (NULL == reqs.pArray) {
385 386 387
    ret = -1;
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
388
  taosArrayPush(reqs.pArray, req);
389 390 391
  reqs.nReqs = 1;

  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
392
  if (ret < 0) {
393 394 395 396 397 398 399 400 401 402
    ret = -1;
    goto end;
  }
  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    ret = -1;
    goto end;
  }
  SEncoder coder = {0};
wmmhello's avatar
wmmhello 已提交
403
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
404 405 406 407 408 409 410 411 412 413 414 415 416
  if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&coder);
    ret = -1;
    goto end;
  }
  tEncoderClear(&coder);

end:
  taosArrayDestroy(reqs.pArray);
  return ret;
}

417
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
418 419 420 421 422 423 424 425 426 427 428 429
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
  if (ret < 0) {
    return -1;
  }

  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    return -1;
  }

430
  SEncoder encoder = {0};
wmmhello's avatar
wmmhello 已提交
431
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
432 433 434 435 436 437 438 439 440
  if (tEncodeSVCreateStbReq(&encoder, req) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&encoder);
    return -1;
  }
  tEncoderClear(&encoder);
  return 0;
}

441
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
442
  int c = 0;
wmmhello's avatar
wmmhello 已提交
443

444
  if (uid == 0) {
wmmhello's avatar
wmmhello 已提交
445
    ctx->index = 0;
446 447 448
    return c;
  }

wmmhello's avatar
wmmhello 已提交
449
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
450
  if (!idInfo) {
451 452 453
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
454
  ctx->index = idInfo->index;
455 456 457 458

  return c;
}

459
int32_t getMetafromSnapShot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
460
  int32_t ret = 0;
461 462 463
  void*   pKey = NULL;
  void*   pVal = NULL;
  int     vLen = 0, kLen = 0;
464

465 466
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
467 468
      metaDebug("tmqsnap get meta end");
      ctx->index = 0;
469
      ctx->queryMetaOrData = false;  // change to get data
470 471
      return 0;
    }
472

473 474 475 476 477 478 479
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
    ASSERT(idInfo);

    *uid = *uidTmp;
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
480
    if (ret == 0) {
481 482 483 484
      break;
    }
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
  }
wmmhello's avatar
wmmhello 已提交
485 486 487 488 489 490

  tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
  SDecoder   dc = {0};
  SMetaEntry me = {0};
  tDecoderInit(&dc, pVal, vLen);
  metaDecodeEntry(&dc, &me);
491
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
wmmhello's avatar
wmmhello 已提交
492

493 494
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
495 496 497 498 499 500 501 502 503 504 505
    SVCreateStbReq req = {0};
    req.name = me.name;
    req.suid = me.uid;
    req.schemaRow = me.stbEntry.schemaRow;
    req.schemaTag = me.stbEntry.schemaTag;
    req.schemaRow.version = 1;
    req.schemaTag.version = 1;

    ret = buildSuperTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_STB;

506 507 508 509
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
    STableInfoForChildTable* data =
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
wmmhello's avatar
wmmhello 已提交
510 511 512
    ASSERT(data);
    SVCreateTbReq req = {0};

wmmhello's avatar
wmmhello 已提交
513
    req.type = TSDB_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
514 515 516 517
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ctb.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
518
    req.ctb.tagNum = data->tagRow->nCols;
519
    req.ctb.stbName = data->tableName;
wmmhello's avatar
wmmhello 已提交
520

wmmhello's avatar
wmmhello 已提交
521
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
522 523
    STag*   p = (STag*)me.ctbEntry.pTags;
    if (tTagIsJson(p)) {
wmmhello's avatar
wmmhello 已提交
524 525 526 527
      if (p->nTag != 0) {
        SSchema* schema = &data->tagRow->pSchema[0];
        taosArrayPush(tagName, schema->name);
      }
528
    } else {
wmmhello's avatar
wmmhello 已提交
529 530 531 532 533 534 535
      SArray* pTagVals = NULL;
      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
        ASSERT(0);
      }
      int16_t nCols = taosArrayGetSize(pTagVals);
      for (int j = 0; j < nCols; ++j) {
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
536 537 538
        for (int i = 0; i < data->tagRow->nCols; i++) {
          SSchema* schema = &data->tagRow->pSchema[i];
          if (schema->colId == pTagVal->cid) {
wmmhello's avatar
wmmhello 已提交
539 540 541 542
            taosArrayPush(tagName, schema->name);
          }
        }
      }
543
      taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
544
    }
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
    //    SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
    //    if(sidInfo->version >= idInfo->version){
    //      // need parse tag
    //      STag* p = (STag*)me.ctbEntry.pTags;
    //      SArray* pTagVals = NULL;
    //      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
    //      }
    //
    //      int16_t nCols = taosArrayGetSize(pTagVals);
    //      for (int j = 0; j < nCols; ++j) {
    //        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
    //      }
    //    }else{
    req.ctb.pTag = me.ctbEntry.pTags;
    //    }
wmmhello's avatar
wmmhello 已提交
560

wmmhello's avatar
wmmhello 已提交
561
    req.ctb.tagName = tagName;
wmmhello's avatar
wmmhello 已提交
562 563
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
wmmhello's avatar
wmmhello 已提交
564
    taosArrayDestroy(tagName);
565
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
566
    SVCreateTbReq req = {0};
wmmhello's avatar
wmmhello 已提交
567
    req.type = TSDB_NORMAL_TABLE;
wmmhello's avatar
wmmhello 已提交
568 569 570 571 572 573
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
574
  } else {
wmmhello's avatar
wmmhello 已提交
575
    ASSERT(0);
576
  }
wmmhello's avatar
wmmhello 已提交
577
  tDecoderClear(&dc);
578 579 580 581

  return ret;
}

582
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx) {
583
  SMetaTableInfo result = {0};
584 585 586
  void*          pKey = NULL;
  void*          pVal = NULL;
  int            vLen, kLen;
587

588 589
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
wmmhello's avatar
wmmhello 已提交
590
      metaDebug("tmqsnap get uid info end");
591 592
      return result;
    }
wmmhello's avatar
wmmhello 已提交
593 594 595 596
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
    ASSERT(idInfo);
597

598
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
599
    if (ret != 0) {
600 601 602
      metaDebug("tmqsnap getUidfromSnapShot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version);
      continue;
    }
wmmhello's avatar
wmmhello 已提交
603
    tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
604 605 606 607
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
608
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
609

610 611 612
    if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
613 614
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
615 616
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
      strcpy(result.tbName, me.name);
617 618 619 620 621
      tDecoderClear(&dc);
      break;
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
      result.uid = me.uid;
      result.suid = 0;
wmmhello's avatar
wmmhello 已提交
622 623
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
624 625
      tDecoderClear(&dc);
      break;
626 627 628
    } else if (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
629 630
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
631 632
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
633 634
      tDecoderClear(&dc);
      break;
635
    } else {
wmmhello's avatar
wmmhello 已提交
636
      metaDebug("tmqsnap get uid continue");
637 638 639 640 641 642 643
      tDecoderClear(&dc);
      continue;
    }
  }

  return result;
}