metaSnapshot.c 19.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "meta.h"

H
Hongze Cheng 已提交
18
// SMetaSnapReader ========================================
H
Hongze Cheng 已提交
19
struct SMetaSnapReader {
H
Hongze Cheng 已提交
20 21 22
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
23
  TBC*    pTbc;
H
Hongze Cheng 已提交
24 25
};

H
Hongze Cheng 已提交
26
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
27 28
  int32_t          code = 0;
  int32_t          c = 0;
H
Hongze Cheng 已提交
29
  SMetaSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31
  // alloc
H
Hongze Cheng 已提交
32 33
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
H
Hongze Cheng 已提交
34 35 36
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
37 38 39
  pReader->pMeta = pMeta;
  pReader->sver = sver;
  pReader->ever = ever;
H
Hongze Cheng 已提交
40 41

  // impl
H
Hongze Cheng 已提交
42
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
H
Hongze Cheng 已提交
43
  if (code) {
H
Hongze Cheng 已提交
44
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
45 46 47
    goto _err;
  }

H
Hongze Cheng 已提交
48
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
49
  if (code) {
H
Hongze Cheng 已提交
50
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
51 52 53
    goto _err;
  }

S
Shengliang Guan 已提交
54
  metaInfo("vgId:%d, vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
H
Hongze Cheng 已提交
55 56

  *ppReader = pReader;
H
Hongze Cheng 已提交
57 58 59
  return code;

_err:
S
Shengliang Guan 已提交
60
  metaError("vgId:%d, vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
61 62
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
63 64
}

H
Hongze Cheng 已提交
65
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
66 67
  int32_t code = 0;

H
Hongze Cheng 已提交
68 69 70
  tdbTbcClose((*ppReader)->pTbc);
  taosMemoryFree(*ppReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
71 72

  return code;
H
Hongze Cheng 已提交
73 74
}

H
Hongze Cheng 已提交
75
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
H
Hongze Cheng 已提交
76
  int32_t     code = 0;
H
Hongze Cheng 已提交
77 78 79 80
  const void* pKey = NULL;
  const void* pData = NULL;
  int32_t     nKey = 0;
  int32_t     nData = 0;
H
Hongze Cheng 已提交
81
  STbDbKey    key;
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83
  *ppData = NULL;
H
Hongze Cheng 已提交
84
  for (;;) {
H
Hongze Cheng 已提交
85
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
H
Hongze Cheng 已提交
86
      goto _exit;
H
Hongze Cheng 已提交
87 88
    }

H
Hongze Cheng 已提交
89 90 91 92 93 94
    key = ((STbDbKey*)pKey)[0];
    if (key.version > pReader->ever) {
      goto _exit;
    }

    if (key.version < pReader->sver) {
H
Hongze Cheng 已提交
95
      tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
96 97 98
      continue;
    }

H
Hongze Cheng 已提交
99
    tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
100 101 102
    break;
  }

103 104 105 106
  if (!pData || !nData) {
    metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
    goto _exit;
  }
H
Hongze Cheng 已提交
107 108 109

  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
  if (*ppData == NULL) {
H
Hongze Cheng 已提交
110
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
111
    goto _err;
H
Hongze Cheng 已提交
112
  }
H
Hongze Cheng 已提交
113 114

  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
W
wenzhouwww 已提交
115
  pHdr->type = SNAP_DATA_META;
H
Hongze Cheng 已提交
116 117 118
  pHdr->size = nData;
  memcpy(pHdr->data, pData, nData);

119 120
  metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
            TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
H
Hongze Cheng 已提交
121 122

_exit:
H
Hongze Cheng 已提交
123
  return code;
H
Hongze Cheng 已提交
124 125

_err:
S
Shengliang Guan 已提交
126
  metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
127
  return code;
H
Hongze Cheng 已提交
128 129
}

H
Hongze Cheng 已提交
130 131 132 133 134 135 136
// SMetaSnapWriter ========================================
struct SMetaSnapWriter {
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
};

H
Hongze Cheng 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter;

  // alloc
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->pMeta = pMeta;
  pWriter->sver = sver;
  pWriter->ever = ever;

151
  metaBegin(pMeta, META_BEGIN_HEAP_NIL);
H
Hongze Cheng 已提交
152

H
Hongze Cheng 已提交
153 154 155 156
  *ppWriter = pWriter;
  return code;

_err:
S
Shengliang Guan 已提交
157
  metaError("vgId:%d, meta snapshot writer open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
158 159 160 161 162 163 164 165 166
  *ppWriter = NULL;
  return code;
}

int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter = *ppWriter;

  if (rollback) {
167
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
168
    code = metaAbort(pWriter->pMeta);
169 170
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
             code);
171
    if (code) goto _err;
H
Hongze Cheng 已提交
172
  } else {
173
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
H
Hongze Cheng 已提交
174
    if (code) goto _err;
175
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
176
    if (code) goto _err;
H
Hongze Cheng 已提交
177 178 179 180 181 182 183
  }
  taosMemoryFree(pWriter);
  *ppWriter = NULL;

  return code;

_err:
S
Shengliang Guan 已提交
184
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
185 186 187
  return code;
}

H
Hongze Cheng 已提交
188
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
Hongze Cheng 已提交
189 190 191 192 193
  int32_t    code = 0;
  SMeta*     pMeta = pWriter->pMeta;
  SMetaEntry metaEntry = {0};
  SDecoder*  pDecoder = &(SDecoder){0};

H
Hongze Cheng 已提交
194
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
H
Hongze Cheng 已提交
195 196 197 198 199
  metaDecodeEntry(pDecoder, &metaEntry);

  code = metaHandleEntry(pMeta, &metaEntry);
  if (code) goto _err;

H
Hongze Cheng 已提交
200
  tDecoderClear(pDecoder);
H
Hongze Cheng 已提交
201 202 203
  return code;

_err:
S
Shengliang Guan 已提交
204
  metaError("vgId:%d, vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
205
  return code;
W
wenzhouwww 已提交
206
}
207

208 209 210 211 212
typedef struct STableInfoForChildTable {
  char*           tableName;
  SSchemaWrapper* schemaRow;
  SSchemaWrapper* tagRow;
} STableInfoForChildTable;
213 214 215

static void destroySTableInfoForChildTable(void* data) {
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
wmmhello's avatar
wmmhello 已提交
216
  taosMemoryFree(pData->tableName);
217
  tDeleteSSchemaWrapper(pData->schemaRow);
wmmhello's avatar
wmmhello 已提交
218
  tDeleteSSchemaWrapper(pData->tagRow);
219 220
}

221
static void MoveToSnapShotVersion(SSnapContext* ctx) {
wmmhello's avatar
wmmhello 已提交
222 223 224
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
225
  int      c = 0;
wmmhello's avatar
wmmhello 已提交
226
  tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
227
  if (c < 0) {
wmmhello's avatar
wmmhello 已提交
228 229 230 231
    tdbTbcMoveToPrev(ctx->pCur);
  }
}

232
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
wmmhello's avatar
wmmhello 已提交
233 234 235
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  STbDbKey key = {.version = ver, .uid = uid};
236
  int      c = 0;
wmmhello's avatar
wmmhello 已提交
237
  tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
238
  return c;
wmmhello's avatar
wmmhello 已提交
239 240
}

241
static void MoveToFirst(SSnapContext* ctx) {
wmmhello's avatar
wmmhello 已提交
242 243 244 245 246
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  tdbTbcMoveToFirst(ctx->pCur);
}

247
static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
wmmhello's avatar
wmmhello 已提交
248
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
249
  if (data) {
wmmhello's avatar
wmmhello 已提交
250 251 252 253
    return;
  }
  STableInfoForChildTable dataTmp = {0};
  dataTmp.tableName = strdup(me->name);
wmmhello's avatar
wmmhello 已提交
254

wmmhello's avatar
wmmhello 已提交
255 256 257 258 259
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
  taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}

260 261
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
                         SSnapContext** ctxRet) {
wmmhello's avatar
wmmhello 已提交
262
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
263
  if (ctx == NULL) return -1;
wmmhello's avatar
wmmhello 已提交
264
  *ctxRet = ctx;
265 266 267 268 269
  ctx->pMeta = pMeta;
  ctx->snapVersion = snapVersion;
  ctx->suid = suid;
  ctx->subType = subType;
  ctx->queryMetaOrData = withMeta;
wmmhello's avatar
wmmhello 已提交
270
  ctx->withMeta = withMeta;
271
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
272
  if (ctx->idVersion == NULL) {
273 274 275 276
    return -1;
  }

  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
277
  if (ctx->suidInfo == NULL) {
278 279 280 281
    return -1;
  }
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);

wmmhello's avatar
wmmhello 已提交
282 283
  ctx->index = 0;
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
284 285
  void* pKey = NULL;
  void* pVal = NULL;
wmmhello's avatar
wmmhello 已提交
286
  int   vLen = 0, kLen = 0;
287

wmmhello's avatar
wmmhello 已提交
288 289
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
  MoveToFirst(ctx);
290
  while (1) {
wmmhello's avatar
wmmhello 已提交
291 292
    int32_t ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
    if (ret < 0) break;
293
    STbDbKey* tmp = (STbDbKey*)pKey;
wmmhello's avatar
wmmhello 已提交
294 295
    if (tmp->version > ctx->snapVersion) break;

296
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
297
    if (idData) {
298 299 300
      continue;
    }

301 302
    if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
        0) {  // check if table exist for now, need optimize later
303 304 305
      continue;
    }

wmmhello's avatar
wmmhello 已提交
306 307 308 309
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
310
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
311
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
312
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
313
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
314 315 316
        continue;
      }
    }
317 318 319 320 321 322 323

    taosArrayPush(ctx->idList, &tmp->uid);
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
    SIdInfo info = {0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));

    tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
324 325 326 327
  }
  taosHashClear(ctx->idVersion);

  MoveToSnapShotVersion(ctx);
328
  while (1) {
wmmhello's avatar
wmmhello 已提交
329
    int32_t ret = tdbTbcPrev(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
330 331
    if (ret < 0) break;

332 333 334
    STbDbKey* tmp = (STbDbKey*)pKey;
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
    if (idData) {
335
      continue;
wmmhello's avatar
wmmhello 已提交
336
    }
337 338
    SIdInfo info = {.version = tmp->version, .index = 0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
wmmhello's avatar
wmmhello 已提交
339 340 341 342 343

    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
344
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
345
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
346
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
347
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
348 349 350 351
        continue;
      }
    }

352 353
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
354 355 356 357 358
      saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
    }
    tDecoderClear(&dc);
  }

359 360
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
    int64_t* uid = taosArrayGet(ctx->idList, i);
wmmhello's avatar
wmmhello 已提交
361
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
362 363 364 365 366
    if (!idData) {
      metaError("meta/snap: null idData");
      return TSDB_CODE_FAILED;
    }

wmmhello's avatar
wmmhello 已提交
367
    idData->index = i;
368 369
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
              idData->index);
370
  }
wmmhello's avatar
wmmhello 已提交
371

372 373
  tdbFree(pKey);
  tdbFree(pVal);
374 375 376
  return TDB_CODE_SUCCESS;
}

377
int32_t destroySnapContext(SSnapContext* ctx) {
378
  tdbTbcClose(ctx->pCur);
wmmhello's avatar
wmmhello 已提交
379
  taosArrayDestroy(ctx->idList);
380 381
  taosHashCleanup(ctx->idVersion);
  taosHashCleanup(ctx->suidInfo);
wmmhello's avatar
wmmhello 已提交
382
  taosMemoryFree(ctx);
383 384 385
  return 0;
}

386 387
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
  int32_t            ret = 0;
wmmhello's avatar
wmmhello 已提交
388
  SVCreateTbBatchReq reqs = {0};
389 390

  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
391
  if (NULL == reqs.pArray) {
392 393 394
    ret = -1;
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
395
  taosArrayPush(reqs.pArray, req);
396 397 398
  reqs.nReqs = 1;

  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
399
  if (ret < 0) {
400 401 402 403 404 405 406 407 408 409
    ret = -1;
    goto end;
  }
  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    ret = -1;
    goto end;
  }
  SEncoder coder = {0};
wmmhello's avatar
wmmhello 已提交
410
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
411 412 413 414 415 416 417 418 419 420 421 422 423
  if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&coder);
    ret = -1;
    goto end;
  }
  tEncoderClear(&coder);

end:
  taosArrayDestroy(reqs.pArray);
  return ret;
}

424
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
425 426 427 428 429 430 431 432 433 434 435 436
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
  if (ret < 0) {
    return -1;
  }

  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    return -1;
  }

437
  SEncoder encoder = {0};
wmmhello's avatar
wmmhello 已提交
438
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
439 440 441 442 443 444 445 446 447
  if (tEncodeSVCreateStbReq(&encoder, req) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&encoder);
    return -1;
  }
  tEncoderClear(&encoder);
  return 0;
}

448
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
449
  int c = 0;
wmmhello's avatar
wmmhello 已提交
450

451
  if (uid == 0) {
wmmhello's avatar
wmmhello 已提交
452
    ctx->index = 0;
453 454 455
    return c;
  }

wmmhello's avatar
wmmhello 已提交
456
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
457
  if (!idInfo) {
458 459 460
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
461
  ctx->index = idInfo->index;
462 463 464 465

  return c;
}

466
int32_t getMetafromSnapShot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
467
  int32_t ret = 0;
468 469 470
  void*   pKey = NULL;
  void*   pVal = NULL;
  int     vLen = 0, kLen = 0;
471

472 473
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
474 475
      metaDebug("tmqsnap get meta end");
      ctx->index = 0;
476
      ctx->queryMetaOrData = false;  // change to get data
477 478
      return 0;
    }
479

480 481 482
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
483 484 485 486
    if (!idInfo) {
      metaError("meta/snap: null idInfo");
      return TSDB_CODE_FAILED;
    }
487 488 489

    *uid = *uidTmp;
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
490
    if (ret == 0) {
491 492 493 494
      break;
    }
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
  }
wmmhello's avatar
wmmhello 已提交
495 496 497 498 499 500

  tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
  SDecoder   dc = {0};
  SMetaEntry me = {0};
  tDecoderInit(&dc, pVal, vLen);
  metaDecodeEntry(&dc, &me);
501
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
wmmhello's avatar
wmmhello 已提交
502

503 504
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
505 506 507 508 509 510 511 512 513 514 515
    SVCreateStbReq req = {0};
    req.name = me.name;
    req.suid = me.uid;
    req.schemaRow = me.stbEntry.schemaRow;
    req.schemaTag = me.stbEntry.schemaTag;
    req.schemaRow.version = 1;
    req.schemaTag.version = 1;

    ret = buildSuperTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_STB;

516 517 518 519
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
    STableInfoForChildTable* data =
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
520 521 522 523 524
    if (!data) {
      metaError("meta/snap: null data");
      return TSDB_CODE_FAILED;
    }

wmmhello's avatar
wmmhello 已提交
525 526
    SVCreateTbReq req = {0};

wmmhello's avatar
wmmhello 已提交
527
    req.type = TSDB_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
528 529 530 531
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ctb.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
532
    req.ctb.tagNum = data->tagRow->nCols;
533
    req.ctb.stbName = data->tableName;
wmmhello's avatar
wmmhello 已提交
534

wmmhello's avatar
wmmhello 已提交
535
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
536 537
    STag*   p = (STag*)me.ctbEntry.pTags;
    if (tTagIsJson(p)) {
wmmhello's avatar
wmmhello 已提交
538 539 540 541
      if (p->nTag != 0) {
        SSchema* schema = &data->tagRow->pSchema[0];
        taosArrayPush(tagName, schema->name);
      }
542
    } else {
wmmhello's avatar
wmmhello 已提交
543 544
      SArray* pTagVals = NULL;
      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
545 546
        metaError("meta/snap: tag to val array failed.");
        return TSDB_CODE_FAILED;
wmmhello's avatar
wmmhello 已提交
547 548 549 550
      }
      int16_t nCols = taosArrayGetSize(pTagVals);
      for (int j = 0; j < nCols; ++j) {
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
551 552 553
        for (int i = 0; i < data->tagRow->nCols; i++) {
          SSchema* schema = &data->tagRow->pSchema[i];
          if (schema->colId == pTagVal->cid) {
wmmhello's avatar
wmmhello 已提交
554 555 556 557
            taosArrayPush(tagName, schema->name);
          }
        }
      }
558
      taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
559
    }
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
    //    SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
    //    if(sidInfo->version >= idInfo->version){
    //      // need parse tag
    //      STag* p = (STag*)me.ctbEntry.pTags;
    //      SArray* pTagVals = NULL;
    //      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
    //      }
    //
    //      int16_t nCols = taosArrayGetSize(pTagVals);
    //      for (int j = 0; j < nCols; ++j) {
    //        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
    //      }
    //    }else{
    req.ctb.pTag = me.ctbEntry.pTags;
    //    }
wmmhello's avatar
wmmhello 已提交
575

wmmhello's avatar
wmmhello 已提交
576
    req.ctb.tagName = tagName;
wmmhello's avatar
wmmhello 已提交
577 578
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
wmmhello's avatar
wmmhello 已提交
579
    taosArrayDestroy(tagName);
580
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
581
    SVCreateTbReq req = {0};
wmmhello's avatar
wmmhello 已提交
582
    req.type = TSDB_NORMAL_TABLE;
wmmhello's avatar
wmmhello 已提交
583 584 585 586 587 588
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
589
  } else {
590 591
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
    ret = -1;
592
  }
wmmhello's avatar
wmmhello 已提交
593
  tDecoderClear(&dc);
594 595 596 597

  return ret;
}

598
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx) {
599
  SMetaTableInfo result = {0};
600 601 602
  void*          pKey = NULL;
  void*          pVal = NULL;
  int            vLen, kLen;
603

604 605
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
wmmhello's avatar
wmmhello 已提交
606
      metaDebug("tmqsnap get uid info end");
607 608
      return result;
    }
wmmhello's avatar
wmmhello 已提交
609 610 611
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
612 613 614 615
    if (!idInfo) {
      metaError("meta/snap: null idInfo");
      return result;
    }
616

617
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
618
    if (ret != 0) {
619 620 621
      metaDebug("tmqsnap getUidfromSnapShot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version);
      continue;
    }
wmmhello's avatar
wmmhello 已提交
622
    tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
623 624 625 626
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
627
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
628

629 630 631
    if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
632 633
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
634 635
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
      strcpy(result.tbName, me.name);
636 637 638 639 640
      tDecoderClear(&dc);
      break;
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
      result.uid = me.uid;
      result.suid = 0;
wmmhello's avatar
wmmhello 已提交
641 642
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
643 644
      tDecoderClear(&dc);
      break;
645 646 647
    } else if (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
648 649
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
650 651
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
652 653
      tDecoderClear(&dc);
      break;
654
    } else {
wmmhello's avatar
wmmhello 已提交
655
      metaDebug("tmqsnap get uid continue");
656 657 658 659 660 661 662
      tDecoderClear(&dc);
      continue;
    }
  }

  return result;
}