metaSnapshot.c 18.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "meta.h"

H
Hongze Cheng 已提交
18
// SMetaSnapReader ========================================
H
Hongze Cheng 已提交
19
struct SMetaSnapReader {
H
Hongze Cheng 已提交
20 21 22
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
23
  TBC*    pTbc;
H
Hongze Cheng 已提交
24 25
};

H
Hongze Cheng 已提交
26
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
27 28
  int32_t          code = 0;
  int32_t          c = 0;
H
Hongze Cheng 已提交
29
  SMetaSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31
  // alloc
H
Hongze Cheng 已提交
32 33
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
H
Hongze Cheng 已提交
34 35 36
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
37 38 39
  pReader->pMeta = pMeta;
  pReader->sver = sver;
  pReader->ever = ever;
H
Hongze Cheng 已提交
40 41

  // impl
H
Hongze Cheng 已提交
42
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
H
Hongze Cheng 已提交
43
  if (code) {
H
Hongze Cheng 已提交
44
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
45 46 47
    goto _err;
  }

H
Hongze Cheng 已提交
48
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
49
  if (code) {
H
Hongze Cheng 已提交
50
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
51 52 53
    goto _err;
  }

S
Shengliang Guan 已提交
54
  metaInfo("vgId:%d, vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
H
Hongze Cheng 已提交
55 56

  *ppReader = pReader;
H
Hongze Cheng 已提交
57 58 59
  return code;

_err:
S
Shengliang Guan 已提交
60
  metaError("vgId:%d, vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
61 62
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
63 64
}

H
Hongze Cheng 已提交
65
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
66 67
  int32_t code = 0;

H
Hongze Cheng 已提交
68 69 70
  tdbTbcClose((*ppReader)->pTbc);
  taosMemoryFree(*ppReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
71 72

  return code;
H
Hongze Cheng 已提交
73 74
}

H
Hongze Cheng 已提交
75
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
H
Hongze Cheng 已提交
76
  int32_t     code = 0;
H
Hongze Cheng 已提交
77 78 79 80
  const void* pKey = NULL;
  const void* pData = NULL;
  int32_t     nKey = 0;
  int32_t     nData = 0;
H
Hongze Cheng 已提交
81
  STbDbKey    key;
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83
  *ppData = NULL;
H
Hongze Cheng 已提交
84
  for (;;) {
H
Hongze Cheng 已提交
85
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
H
Hongze Cheng 已提交
86
      goto _exit;
H
Hongze Cheng 已提交
87 88
    }

H
Hongze Cheng 已提交
89 90 91 92 93 94
    key = ((STbDbKey*)pKey)[0];
    if (key.version > pReader->ever) {
      goto _exit;
    }

    if (key.version < pReader->sver) {
H
Hongze Cheng 已提交
95
      tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
96 97 98
      continue;
    }

H
Hongze Cheng 已提交
99
    tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
100 101 102
    break;
  }

H
Hongze Cheng 已提交
103 104 105 106
  ASSERT(pData && nData);

  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
  if (*ppData == NULL) {
H
Hongze Cheng 已提交
107
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
108
    goto _err;
H
Hongze Cheng 已提交
109
  }
H
Hongze Cheng 已提交
110 111

  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
W
wenzhouwww 已提交
112
  pHdr->type = SNAP_DATA_META;
H
Hongze Cheng 已提交
113 114 115
  pHdr->size = nData;
  memcpy(pHdr->data, pData, nData);

S
Shengliang Guan 已提交
116
  metaInfo("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " nData:%d",
H
Hongze Cheng 已提交
117
           TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
H
Hongze Cheng 已提交
118 119

_exit:
H
Hongze Cheng 已提交
120
  return code;
H
Hongze Cheng 已提交
121 122

_err:
S
Shengliang Guan 已提交
123
  metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
124
  return code;
H
Hongze Cheng 已提交
125 126
}

H
Hongze Cheng 已提交
127 128 129 130 131 132 133
// SMetaSnapWriter ========================================
struct SMetaSnapWriter {
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
};

H
Hongze Cheng 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter;

  // alloc
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->pMeta = pMeta;
  pWriter->sver = sver;
  pWriter->ever = ever;

148
  metaBegin(pMeta, 1);
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150 151 152 153
  *ppWriter = pWriter;
  return code;

_err:
S
Shengliang Guan 已提交
154
  metaError("vgId:%d, meta snapshot writer open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162 163
  *ppWriter = NULL;
  return code;
}

int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter = *ppWriter;

  if (rollback) {
H
Hongze Cheng 已提交
164
    ASSERT(0);
H
Hongze Cheng 已提交
165
  } else {
166
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
H
Hongze Cheng 已提交
167
    if (code) goto _err;
168
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
169
    if (code) goto _err;
H
Hongze Cheng 已提交
170 171 172 173 174 175 176
  }
  taosMemoryFree(pWriter);
  *ppWriter = NULL;

  return code;

_err:
S
Shengliang Guan 已提交
177
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
178 179 180
  return code;
}

H
Hongze Cheng 已提交
181
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
Hongze Cheng 已提交
182 183 184 185 186
  int32_t    code = 0;
  SMeta*     pMeta = pWriter->pMeta;
  SMetaEntry metaEntry = {0};
  SDecoder*  pDecoder = &(SDecoder){0};

H
Hongze Cheng 已提交
187
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
H
Hongze Cheng 已提交
188 189 190 191 192
  metaDecodeEntry(pDecoder, &metaEntry);

  code = metaHandleEntry(pMeta, &metaEntry);
  if (code) goto _err;

H
Hongze Cheng 已提交
193
  tDecoderClear(pDecoder);
H
Hongze Cheng 已提交
194 195 196
  return code;

_err:
S
Shengliang Guan 已提交
197
  metaError("vgId:%d, vnode snapshot meta write failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
198
  return code;
W
wenzhouwww 已提交
199
}
200

201 202 203 204 205
typedef struct STableInfoForChildTable {
  char*           tableName;
  SSchemaWrapper* schemaRow;
  SSchemaWrapper* tagRow;
} STableInfoForChildTable;
206 207 208

static void destroySTableInfoForChildTable(void* data) {
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
wmmhello's avatar
wmmhello 已提交
209
  taosMemoryFree(pData->tableName);
210
  tDeleteSSchemaWrapper(pData->schemaRow);
wmmhello's avatar
wmmhello 已提交
211
  tDeleteSSchemaWrapper(pData->tagRow);
212 213
}

214
static void MoveToSnapShotVersion(SSnapContext* ctx) {
wmmhello's avatar
wmmhello 已提交
215 216 217
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
218
  int      c = 0;
wmmhello's avatar
wmmhello 已提交
219
  tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
220
  if (c < 0) {
wmmhello's avatar
wmmhello 已提交
221 222 223 224
    tdbTbcMoveToPrev(ctx->pCur);
  }
}

225
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
wmmhello's avatar
wmmhello 已提交
226 227 228
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  STbDbKey key = {.version = ver, .uid = uid};
229
  int      c = 0;
wmmhello's avatar
wmmhello 已提交
230
  tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
231
  return c;
wmmhello's avatar
wmmhello 已提交
232 233
}

234
static void MoveToFirst(SSnapContext* ctx) {
wmmhello's avatar
wmmhello 已提交
235 236 237 238 239
  tdbTbcClose(ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
  tdbTbcMoveToFirst(ctx->pCur);
}

240
static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
wmmhello's avatar
wmmhello 已提交
241
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
242
  if (data) {
wmmhello's avatar
wmmhello 已提交
243 244 245 246
    return;
  }
  STableInfoForChildTable dataTmp = {0};
  dataTmp.tableName = strdup(me->name);
wmmhello's avatar
wmmhello 已提交
247

wmmhello's avatar
wmmhello 已提交
248 249 250 251 252
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
  taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}

253 254
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
                         SSnapContext** ctxRet) {
wmmhello's avatar
wmmhello 已提交
255
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
256
  if (ctx == NULL) return -1;
wmmhello's avatar
wmmhello 已提交
257
  *ctxRet = ctx;
258 259 260 261 262
  ctx->pMeta = pMeta;
  ctx->snapVersion = snapVersion;
  ctx->suid = suid;
  ctx->subType = subType;
  ctx->queryMetaOrData = withMeta;
wmmhello's avatar
wmmhello 已提交
263
  ctx->withMeta = withMeta;
264
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
265
  if (ctx->idVersion == NULL) {
266 267 268 269
    return -1;
  }

  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
270
  if (ctx->suidInfo == NULL) {
271 272 273 274
    return -1;
  }
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);

wmmhello's avatar
wmmhello 已提交
275 276
  ctx->index = 0;
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
277 278
  void* pKey = NULL;
  void* pVal = NULL;
wmmhello's avatar
wmmhello 已提交
279
  int   vLen = 0, kLen = 0;
280

wmmhello's avatar
wmmhello 已提交
281 282
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
  MoveToFirst(ctx);
283
  while (1) {
wmmhello's avatar
wmmhello 已提交
284 285
    int32_t ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
    if (ret < 0) break;
286
    STbDbKey* tmp = (STbDbKey*)pKey;
wmmhello's avatar
wmmhello 已提交
287 288
    if (tmp->version > ctx->snapVersion) break;

289
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
290
    if (idData) {
291 292 293
      continue;
    }

294 295
    if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
        0) {  // check if table exist for now, need optimize later
296 297 298
      continue;
    }

wmmhello's avatar
wmmhello 已提交
299 300 301 302
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
303
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
304
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
305
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
306
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
307 308 309
        continue;
      }
    }
310 311 312 313 314 315 316

    taosArrayPush(ctx->idList, &tmp->uid);
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
    SIdInfo info = {0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));

    tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
317 318 319 320
  }
  taosHashClear(ctx->idVersion);

  MoveToSnapShotVersion(ctx);
321
  while (1) {
wmmhello's avatar
wmmhello 已提交
322
    int32_t ret = tdbTbcPrev(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
323 324
    if (ret < 0) break;

325 326 327
    STbDbKey* tmp = (STbDbKey*)pKey;
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
    if (idData) {
328
      continue;
wmmhello's avatar
wmmhello 已提交
329
    }
330 331
    SIdInfo info = {.version = tmp->version, .index = 0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
wmmhello's avatar
wmmhello 已提交
332 333 334 335 336

    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
337
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
338
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
339
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
340
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
341 342 343 344
        continue;
      }
    }

345 346
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
347 348 349 350 351
      saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
    }
    tDecoderClear(&dc);
  }

352 353
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
    int64_t* uid = taosArrayGet(ctx->idList, i);
wmmhello's avatar
wmmhello 已提交
354 355 356
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
    ASSERT(idData);
    idData->index = i;
357 358
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
              idData->index);
359
  }
wmmhello's avatar
wmmhello 已提交
360

361 362
  tdbFree(pKey);
  tdbFree(pVal);
363 364 365
  return TDB_CODE_SUCCESS;
}

366
int32_t destroySnapContext(SSnapContext* ctx) {
367
  tdbTbcClose(ctx->pCur);
wmmhello's avatar
wmmhello 已提交
368
  taosArrayDestroy(ctx->idList);
369 370
  taosHashCleanup(ctx->idVersion);
  taosHashCleanup(ctx->suidInfo);
wmmhello's avatar
wmmhello 已提交
371
  taosMemoryFree(ctx);
372 373 374
  return 0;
}

375 376
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
  int32_t            ret = 0;
wmmhello's avatar
wmmhello 已提交
377
  SVCreateTbBatchReq reqs = {0};
378 379

  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
380
  if (NULL == reqs.pArray) {
381 382 383
    ret = -1;
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
384
  taosArrayPush(reqs.pArray, req);
385 386 387
  reqs.nReqs = 1;

  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
388
  if (ret < 0) {
389 390 391 392 393 394 395 396 397 398
    ret = -1;
    goto end;
  }
  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    ret = -1;
    goto end;
  }
  SEncoder coder = {0};
wmmhello's avatar
wmmhello 已提交
399
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
400 401 402 403 404 405 406 407 408 409 410 411 412
  if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&coder);
    ret = -1;
    goto end;
  }
  tEncoderClear(&coder);

end:
  taosArrayDestroy(reqs.pArray);
  return ret;
}

413
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
414 415 416 417 418 419 420 421 422 423 424 425
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
  if (ret < 0) {
    return -1;
  }

  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    return -1;
  }

426
  SEncoder encoder = {0};
wmmhello's avatar
wmmhello 已提交
427
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
428 429 430 431 432 433 434 435 436
  if (tEncodeSVCreateStbReq(&encoder, req) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&encoder);
    return -1;
  }
  tEncoderClear(&encoder);
  return 0;
}

437
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
438
  int c = 0;
wmmhello's avatar
wmmhello 已提交
439

440
  if (uid == 0) {
wmmhello's avatar
wmmhello 已提交
441
    ctx->index = 0;
442 443 444
    return c;
  }

wmmhello's avatar
wmmhello 已提交
445
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
446
  if (!idInfo) {
447 448 449
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
450
  ctx->index = idInfo->index;
451 452 453 454

  return c;
}

455
int32_t getMetafromSnapShot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
456
  int32_t ret = 0;
457 458 459
  void*   pKey = NULL;
  void*   pVal = NULL;
  int     vLen = 0, kLen = 0;
460

461 462
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
463 464
      metaDebug("tmqsnap get meta end");
      ctx->index = 0;
465
      ctx->queryMetaOrData = false;  // change to get data
466 467
      return 0;
    }
468

469 470 471 472 473 474 475
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
    ASSERT(idInfo);

    *uid = *uidTmp;
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
476
    if (ret == 0) {
477 478 479 480
      break;
    }
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
  }
wmmhello's avatar
wmmhello 已提交
481 482 483 484 485 486

  tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
  SDecoder   dc = {0};
  SMetaEntry me = {0};
  tDecoderInit(&dc, pVal, vLen);
  metaDecodeEntry(&dc, &me);
487
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
wmmhello's avatar
wmmhello 已提交
488

489 490
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
491 492 493 494 495 496 497 498 499 500 501
    SVCreateStbReq req = {0};
    req.name = me.name;
    req.suid = me.uid;
    req.schemaRow = me.stbEntry.schemaRow;
    req.schemaTag = me.stbEntry.schemaTag;
    req.schemaRow.version = 1;
    req.schemaTag.version = 1;

    ret = buildSuperTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_STB;

502 503 504 505
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
    STableInfoForChildTable* data =
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
wmmhello's avatar
wmmhello 已提交
506 507 508
    ASSERT(data);
    SVCreateTbReq req = {0};

wmmhello's avatar
wmmhello 已提交
509
    req.type = TSDB_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
510 511 512 513
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ctb.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
514
    req.ctb.tagNum = data->tagRow->nCols;
515
    req.ctb.stbName = data->tableName;
wmmhello's avatar
wmmhello 已提交
516

wmmhello's avatar
wmmhello 已提交
517
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
518 519
    STag*   p = (STag*)me.ctbEntry.pTags;
    if (tTagIsJson(p)) {
wmmhello's avatar
wmmhello 已提交
520 521 522 523
      if (p->nTag != 0) {
        SSchema* schema = &data->tagRow->pSchema[0];
        taosArrayPush(tagName, schema->name);
      }
524
    } else {
wmmhello's avatar
wmmhello 已提交
525 526 527 528 529 530 531
      SArray* pTagVals = NULL;
      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
        ASSERT(0);
      }
      int16_t nCols = taosArrayGetSize(pTagVals);
      for (int j = 0; j < nCols; ++j) {
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
532 533 534
        for (int i = 0; i < data->tagRow->nCols; i++) {
          SSchema* schema = &data->tagRow->pSchema[i];
          if (schema->colId == pTagVal->cid) {
wmmhello's avatar
wmmhello 已提交
535 536 537 538
            taosArrayPush(tagName, schema->name);
          }
        }
      }
539
      taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
540
    }
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
    //    SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
    //    if(sidInfo->version >= idInfo->version){
    //      // need parse tag
    //      STag* p = (STag*)me.ctbEntry.pTags;
    //      SArray* pTagVals = NULL;
    //      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
    //      }
    //
    //      int16_t nCols = taosArrayGetSize(pTagVals);
    //      for (int j = 0; j < nCols; ++j) {
    //        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
    //      }
    //    }else{
    req.ctb.pTag = me.ctbEntry.pTags;
    //    }
wmmhello's avatar
wmmhello 已提交
556

wmmhello's avatar
wmmhello 已提交
557
    req.ctb.tagName = tagName;
wmmhello's avatar
wmmhello 已提交
558 559
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
wmmhello's avatar
wmmhello 已提交
560
    taosArrayDestroy(tagName);
561
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
562
    SVCreateTbReq req = {0};
wmmhello's avatar
wmmhello 已提交
563
    req.type = TSDB_NORMAL_TABLE;
wmmhello's avatar
wmmhello 已提交
564 565 566 567 568 569
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
570
  } else {
wmmhello's avatar
wmmhello 已提交
571
    ASSERT(0);
572
  }
wmmhello's avatar
wmmhello 已提交
573
  tDecoderClear(&dc);
574 575 576 577

  return ret;
}

578
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx) {
579
  SMetaTableInfo result = {0};
580 581 582
  void*          pKey = NULL;
  void*          pVal = NULL;
  int            vLen, kLen;
583

584 585
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
wmmhello's avatar
wmmhello 已提交
586
      metaDebug("tmqsnap get uid info end");
587 588
      return result;
    }
wmmhello's avatar
wmmhello 已提交
589 590 591 592
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
    ASSERT(idInfo);
593

594
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
595
    if (ret != 0) {
596 597 598
      metaDebug("tmqsnap getUidfromSnapShot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version);
      continue;
    }
wmmhello's avatar
wmmhello 已提交
599
    tdbTbcGet(ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
600 601 602 603
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
604
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
605

606 607 608
    if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
609 610
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
611 612
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
      strcpy(result.tbName, me.name);
613 614 615 616 617
      tDecoderClear(&dc);
      break;
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
      result.uid = me.uid;
      result.suid = 0;
wmmhello's avatar
wmmhello 已提交
618 619
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
620 621
      tDecoderClear(&dc);
      break;
622 623 624
    } else if (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
625 626
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
627 628
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
629 630
      tDecoderClear(&dc);
      break;
631
    } else {
wmmhello's avatar
wmmhello 已提交
632
      metaDebug("tmqsnap get uid continue");
633 634 635 636 637 638 639
      tDecoderClear(&dc);
      continue;
    }
  }

  return result;
}