metaSnapshot.c 19.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "meta.h"

H
Hongze Cheng 已提交
18
// SMetaSnapReader ========================================
H
Hongze Cheng 已提交
19
struct SMetaSnapReader {
H
Hongze Cheng 已提交
20 21 22
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
23
  TBC*    pTbc;
H
Hongze Cheng 已提交
24 25
};

H
Hongze Cheng 已提交
26
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
27 28
  int32_t          code = 0;
  int32_t          c = 0;
H
Hongze Cheng 已提交
29
  SMetaSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31
  // alloc
H
Hongze Cheng 已提交
32 33
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
H
Hongze Cheng 已提交
34 35 36
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
37 38 39
  pReader->pMeta = pMeta;
  pReader->sver = sver;
  pReader->ever = ever;
H
Hongze Cheng 已提交
40 41

  // impl
H
Hongze Cheng 已提交
42
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
H
Hongze Cheng 已提交
43
  if (code) {
H
Hongze Cheng 已提交
44
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
45 46 47
    goto _err;
  }

H
Hongze Cheng 已提交
48
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
H
Hongze Cheng 已提交
49
  if (code) {
H
Hongze Cheng 已提交
50
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
51 52 53
    goto _err;
  }

S
Shengliang Guan 已提交
54
  metaInfo("vgId:%d, vnode snapshot meta reader opened", TD_VID(pMeta->pVnode));
H
Hongze Cheng 已提交
55 56

  *ppReader = pReader;
H
Hongze Cheng 已提交
57 58 59
  return code;

_err:
S
Shengliang Guan 已提交
60
  metaError("vgId:%d, vnode snapshot meta reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
61 62
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
63 64
}

H
Hongze Cheng 已提交
65
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
H
Hongze Cheng 已提交
66 67
  int32_t code = 0;

H
Hongze Cheng 已提交
68 69 70
  tdbTbcClose((*ppReader)->pTbc);
  taosMemoryFree(*ppReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
71 72

  return code;
H
Hongze Cheng 已提交
73 74
}

H
Hongze Cheng 已提交
75
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
H
Hongze Cheng 已提交
76
  int32_t     code = 0;
H
Hongze Cheng 已提交
77 78 79 80
  const void* pKey = NULL;
  const void* pData = NULL;
  int32_t     nKey = 0;
  int32_t     nData = 0;
H
Hongze Cheng 已提交
81
  STbDbKey    key;
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83
  *ppData = NULL;
H
Hongze Cheng 已提交
84
  for (;;) {
H
Hongze Cheng 已提交
85
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
H
Hongze Cheng 已提交
86
      goto _exit;
H
Hongze Cheng 已提交
87 88
    }

H
Hongze Cheng 已提交
89 90 91 92 93 94
    key = ((STbDbKey*)pKey)[0];
    if (key.version > pReader->ever) {
      goto _exit;
    }

    if (key.version < pReader->sver) {
H
Hongze Cheng 已提交
95
      tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
96 97 98
      continue;
    }

H
Hongze Cheng 已提交
99
    tdbTbcMoveToNext(pReader->pTbc);
H
Hongze Cheng 已提交
100 101 102
    break;
  }

103 104 105 106
  if (!pData || !nData) {
    metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
    goto _exit;
  }
H
Hongze Cheng 已提交
107 108 109

  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
  if (*ppData == NULL) {
H
Hongze Cheng 已提交
110
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
111
    goto _err;
H
Hongze Cheng 已提交
112
  }
H
Hongze Cheng 已提交
113 114

  SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
W
wenzhouwww 已提交
115
  pHdr->type = SNAP_DATA_META;
H
Hongze Cheng 已提交
116 117 118
  pHdr->size = nData;
  memcpy(pHdr->data, pData, nData);

119 120
  metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
            TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
H
Hongze Cheng 已提交
121 122

_exit:
H
Hongze Cheng 已提交
123
  return code;
H
Hongze Cheng 已提交
124 125

_err:
S
Shengliang Guan 已提交
126
  metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
127
  return code;
H
Hongze Cheng 已提交
128 129
}

H
Hongze Cheng 已提交
130 131 132 133 134 135 136
// SMetaSnapWriter ========================================
struct SMetaSnapWriter {
  SMeta*  pMeta;
  int64_t sver;
  int64_t ever;
};

H
Hongze Cheng 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter;

  // alloc
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->pMeta = pMeta;
  pWriter->sver = sver;
  pWriter->ever = ever;

151
  metaBegin(pMeta, META_BEGIN_HEAP_NIL);
H
Hongze Cheng 已提交
152

H
Hongze Cheng 已提交
153 154 155 156
  *ppWriter = pWriter;
  return code;

_err:
S
Shengliang Guan 已提交
157
  metaError("vgId:%d, meta snapshot writer open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
158 159 160 161 162 163 164 165 166
  *ppWriter = NULL;
  return code;
}

int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
  int32_t          code = 0;
  SMetaSnapWriter* pWriter = *ppWriter;

  if (rollback) {
167
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
168
    code = metaAbort(pWriter->pMeta);
169 170
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
             code);
171
    if (code) goto _err;
H
Hongze Cheng 已提交
172
  } else {
173
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
H
Hongze Cheng 已提交
174
    if (code) goto _err;
175
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
176
    if (code) goto _err;
H
Hongze Cheng 已提交
177 178 179 180 181 182 183
  }
  taosMemoryFree(pWriter);
  *ppWriter = NULL;

  return code;

_err:
S
Shengliang Guan 已提交
184
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
185 186 187
  return code;
}

H
Hongze Cheng 已提交
188
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
Hongze Cheng 已提交
189
  int32_t    code = 0;
190
  int32_t    line = 0;
H
Hongze Cheng 已提交
191 192 193 194
  SMeta*     pMeta = pWriter->pMeta;
  SMetaEntry metaEntry = {0};
  SDecoder*  pDecoder = &(SDecoder){0};

H
Hongze Cheng 已提交
195
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
196
  code = metaDecodeEntry(pDecoder, &metaEntry);
197
  VND_CHECK_CODE(code, line, _err);
H
Hongze Cheng 已提交
198 199

  code = metaHandleEntry(pMeta, &metaEntry);
200
  VND_CHECK_CODE(code, line, _err);
H
Hongze Cheng 已提交
201

H
Hongze Cheng 已提交
202
  tDecoderClear(pDecoder);
H
Hongze Cheng 已提交
203 204 205
  return code;

_err:
206
  tDecoderClear(pDecoder);
207
  metaError("vgId:%d, vnode snapshot meta write failed since %s at line:%d", TD_VID(pMeta->pVnode), terrstr(), line);
H
Hongze Cheng 已提交
208
  return code;
W
wenzhouwww 已提交
209
}
210

211 212 213 214 215
typedef struct STableInfoForChildTable {
  char*           tableName;
  SSchemaWrapper* schemaRow;
  SSchemaWrapper* tagRow;
} STableInfoForChildTable;
216 217 218

static void destroySTableInfoForChildTable(void* data) {
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
wmmhello's avatar
wmmhello 已提交
219
  taosMemoryFree(pData->tableName);
220 221
  tDeleteSchemaWrapper(pData->schemaRow);
  tDeleteSchemaWrapper(pData->tagRow);
222 223
}

224
static void MoveToSnapShotVersion(SSnapContext* ctx) {
225 226
  tdbTbcClose((TBC*)ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
wmmhello's avatar
wmmhello 已提交
227
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
228
  int      c = 0;
229
  tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
230
  if (c < 0) {
231
    tdbTbcMoveToPrev((TBC*)ctx->pCur);
wmmhello's avatar
wmmhello 已提交
232 233 234
  }
}

235
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
236 237
  tdbTbcClose((TBC*)ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
wmmhello's avatar
wmmhello 已提交
238
  STbDbKey key = {.version = ver, .uid = uid};
239
  int      c = 0;
240
  tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
241
  return c;
wmmhello's avatar
wmmhello 已提交
242 243
}

244
static void MoveToFirst(SSnapContext* ctx) {
245 246 247
  tdbTbcClose((TBC*)ctx->pCur);
  tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
  tdbTbcMoveToFirst((TBC*)ctx->pCur);
wmmhello's avatar
wmmhello 已提交
248 249
}

250
static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
wmmhello's avatar
wmmhello 已提交
251
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
252
  if (data) {
wmmhello's avatar
wmmhello 已提交
253 254 255
    return;
  }
  STableInfoForChildTable dataTmp = {0};
256
  dataTmp.tableName = taosStrdup(me->name);
wmmhello's avatar
wmmhello 已提交
257

wmmhello's avatar
wmmhello 已提交
258 259 260 261 262
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
  taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}

263
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
264
                         SSnapContext** ctxRet) {
wmmhello's avatar
wmmhello 已提交
265
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
266
  if (ctx == NULL) return -1;
wmmhello's avatar
wmmhello 已提交
267
  *ctxRet = ctx;
268
  ctx->pMeta = pVnode->pMeta;
269 270 271
  ctx->snapVersion = snapVersion;
  ctx->suid = suid;
  ctx->subType = subType;
272
  ctx->queryMeta = withMeta;
wmmhello's avatar
wmmhello 已提交
273
  ctx->withMeta = withMeta;
274
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
275
  if (ctx->idVersion == NULL) {
276 277 278 279
    return -1;
  }

  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
280
  if (ctx->suidInfo == NULL) {
281 282 283 284
    return -1;
  }
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);

wmmhello's avatar
wmmhello 已提交
285 286
  ctx->index = 0;
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
287 288
  void* pKey = NULL;
  void* pVal = NULL;
wmmhello's avatar
wmmhello 已提交
289
  int   vLen = 0, kLen = 0;
290

wmmhello's avatar
wmmhello 已提交
291 292
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
  MoveToFirst(ctx);
293
  while (1) {
294
    int32_t ret = tdbTbcNext((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
wmmhello's avatar
wmmhello 已提交
295
    if (ret < 0) break;
296
    STbDbKey* tmp = (STbDbKey*)pKey;
wmmhello's avatar
wmmhello 已提交
297 298
    if (tmp->version > ctx->snapVersion) break;

299
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
300
    if (idData) {
301 302 303
      continue;
    }

304
    if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
305
        0) {  // check if table exist for now, need optimize later
306 307 308
      continue;
    }

wmmhello's avatar
wmmhello 已提交
309 310 311 312
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
313
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
314
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
315
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
316
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
317 318 319
        continue;
      }
    }
320 321 322 323 324 325 326

    taosArrayPush(ctx->idList, &tmp->uid);
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
    SIdInfo info = {0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));

    tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
327 328 329 330
  }
  taosHashClear(ctx->idVersion);

  MoveToSnapShotVersion(ctx);
331
  while (1) {
332
    int32_t ret = tdbTbcPrev((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
333 334
    if (ret < 0) break;

335 336 337
    STbDbKey* tmp = (STbDbKey*)pKey;
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
    if (idData) {
338
      continue;
wmmhello's avatar
wmmhello 已提交
339
    }
340 341
    SIdInfo info = {.version = tmp->version, .index = 0};
    taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
wmmhello's avatar
wmmhello 已提交
342 343 344 345 346

    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
347
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
wmmhello's avatar
wmmhello 已提交
348
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
349
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
350
        tDecoderClear(&dc);
wmmhello's avatar
wmmhello 已提交
351 352 353 354
        continue;
      }
    }

355 356
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
357 358 359 360 361
      saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
    }
    tDecoderClear(&dc);
  }

362 363
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
    int64_t* uid = taosArrayGet(ctx->idList, i);
wmmhello's avatar
wmmhello 已提交
364
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
365 366 367 368 369
    if (!idData) {
      metaError("meta/snap: null idData");
      return TSDB_CODE_FAILED;
    }

wmmhello's avatar
wmmhello 已提交
370
    idData->index = i;
371 372
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
              idData->index);
373
  }
wmmhello's avatar
wmmhello 已提交
374

375 376
  tdbFree(pKey);
  tdbFree(pVal);
377 378 379
  return TDB_CODE_SUCCESS;
}

380
int32_t destroySnapContext(SSnapContext* ctx) {
381
  tdbTbcClose((TBC*)ctx->pCur);
wmmhello's avatar
wmmhello 已提交
382
  taosArrayDestroy(ctx->idList);
383 384
  taosHashCleanup(ctx->idVersion);
  taosHashCleanup(ctx->suidInfo);
wmmhello's avatar
wmmhello 已提交
385
  taosMemoryFree(ctx);
386 387 388
  return 0;
}

389 390
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
  int32_t            ret = 0;
wmmhello's avatar
wmmhello 已提交
391
  SVCreateTbBatchReq reqs = {0};
392 393

  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
394
  if (NULL == reqs.pArray) {
395 396 397
    ret = -1;
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
398
  taosArrayPush(reqs.pArray, req);
399 400 401
  reqs.nReqs = 1;

  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
402
  if (ret < 0) {
403 404 405 406 407 408 409 410 411 412
    ret = -1;
    goto end;
  }
  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    ret = -1;
    goto end;
  }
  SEncoder coder = {0};
wmmhello's avatar
wmmhello 已提交
413
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
414 415 416 417 418 419 420 421 422 423 424 425 426
  if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&coder);
    ret = -1;
    goto end;
  }
  tEncoderClear(&coder);

end:
  taosArrayDestroy(reqs.pArray);
  return ret;
}

427
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
428 429 430 431 432 433 434 435 436 437 438 439
  int32_t ret = 0;
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
  if (ret < 0) {
    return -1;
  }

  *contLen += sizeof(SMsgHead);
  *pBuf = taosMemoryMalloc(*contLen);
  if (NULL == *pBuf) {
    return -1;
  }

440
  SEncoder encoder = {0};
wmmhello's avatar
wmmhello 已提交
441
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
442 443 444 445 446 447 448 449 450
  if (tEncodeSVCreateStbReq(&encoder, req) < 0) {
    taosMemoryFreeClear(*pBuf);
    tEncoderClear(&encoder);
    return -1;
  }
  tEncoderClear(&encoder);
  return 0;
}

451
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
452
  int c = 0;
wmmhello's avatar
wmmhello 已提交
453

454
  if (uid == 0) {
wmmhello's avatar
wmmhello 已提交
455
    ctx->index = 0;
456 457 458
    return c;
  }

wmmhello's avatar
wmmhello 已提交
459
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
460
  if (!idInfo) {
461 462 463
    return -1;
  }

wmmhello's avatar
wmmhello 已提交
464
  ctx->index = idInfo->index;
465 466 467 468

  return c;
}

H
Haojun Liao 已提交
469
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
470
  int32_t ret = 0;
471 472 473
  void*   pKey = NULL;
  void*   pVal = NULL;
  int     vLen = 0, kLen = 0;
474

475 476
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
477 478
      metaDebug("tmqsnap get meta end");
      ctx->index = 0;
479
      ctx->queryMeta = false;  // change to get data
480 481
      return 0;
    }
482

483 484 485
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
486 487 488 489
    if (!idInfo) {
      metaError("meta/snap: null idInfo");
      return TSDB_CODE_FAILED;
    }
490 491 492

    *uid = *uidTmp;
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
493
    if (ret == 0) {
494 495 496 497
      break;
    }
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
  }
wmmhello's avatar
wmmhello 已提交
498

499
  tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
wmmhello's avatar
wmmhello 已提交
500 501 502 503
  SDecoder   dc = {0};
  SMetaEntry me = {0};
  tDecoderInit(&dc, pVal, vLen);
  metaDecodeEntry(&dc, &me);
504
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
wmmhello's avatar
wmmhello 已提交
505

506 507
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
wmmhello's avatar
wmmhello 已提交
508 509 510 511 512 513 514 515 516 517 518
    SVCreateStbReq req = {0};
    req.name = me.name;
    req.suid = me.uid;
    req.schemaRow = me.stbEntry.schemaRow;
    req.schemaTag = me.stbEntry.schemaTag;
    req.schemaRow.version = 1;
    req.schemaTag.version = 1;

    ret = buildSuperTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_STB;

519 520 521 522
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
    STableInfoForChildTable* data =
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
523 524 525 526 527
    if (!data) {
      metaError("meta/snap: null data");
      return TSDB_CODE_FAILED;
    }

wmmhello's avatar
wmmhello 已提交
528 529
    SVCreateTbReq req = {0};

wmmhello's avatar
wmmhello 已提交
530
    req.type = TSDB_CHILD_TABLE;
wmmhello's avatar
wmmhello 已提交
531 532 533 534
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ctb.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
535
    req.ctb.tagNum = data->tagRow->nCols;
536
    req.ctb.stbName = data->tableName;
wmmhello's avatar
wmmhello 已提交
537

wmmhello's avatar
wmmhello 已提交
538
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
539 540
    STag*   p = (STag*)me.ctbEntry.pTags;
    if (tTagIsJson(p)) {
wmmhello's avatar
wmmhello 已提交
541 542 543 544
      if (p->nTag != 0) {
        SSchema* schema = &data->tagRow->pSchema[0];
        taosArrayPush(tagName, schema->name);
      }
545
    } else {
wmmhello's avatar
wmmhello 已提交
546 547
      SArray* pTagVals = NULL;
      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
548 549
        metaError("meta/snap: tag to val array failed.");
        return TSDB_CODE_FAILED;
wmmhello's avatar
wmmhello 已提交
550 551 552 553
      }
      int16_t nCols = taosArrayGetSize(pTagVals);
      for (int j = 0; j < nCols; ++j) {
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
554 555 556
        for (int i = 0; i < data->tagRow->nCols; i++) {
          SSchema* schema = &data->tagRow->pSchema[i];
          if (schema->colId == pTagVal->cid) {
wmmhello's avatar
wmmhello 已提交
557 558 559 560
            taosArrayPush(tagName, schema->name);
          }
        }
      }
561
      taosArrayDestroy(pTagVals);
wmmhello's avatar
wmmhello 已提交
562
    }
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
    //    SIdInfo* sidInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &me.ctbEntry.suid, sizeof(tb_uid_t));
    //    if(sidInfo->version >= idInfo->version){
    //      // need parse tag
    //      STag* p = (STag*)me.ctbEntry.pTags;
    //      SArray* pTagVals = NULL;
    //      if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
    //      }
    //
    //      int16_t nCols = taosArrayGetSize(pTagVals);
    //      for (int j = 0; j < nCols; ++j) {
    //        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
    //      }
    //    }else{
    req.ctb.pTag = me.ctbEntry.pTags;
    //    }
wmmhello's avatar
wmmhello 已提交
578

wmmhello's avatar
wmmhello 已提交
579
    req.ctb.tagName = tagName;
wmmhello's avatar
wmmhello 已提交
580 581
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
wmmhello's avatar
wmmhello 已提交
582
    taosArrayDestroy(tagName);
583
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
wmmhello's avatar
wmmhello 已提交
584
    SVCreateTbReq req = {0};
wmmhello's avatar
wmmhello 已提交
585
    req.type = TSDB_NORMAL_TABLE;
wmmhello's avatar
wmmhello 已提交
586 587 588 589 590 591
    req.name = me.name;
    req.uid = me.uid;
    req.commentLen = -1;
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
    *type = TDMT_VND_CREATE_TABLE;
592
  } else {
593 594
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
    ret = -1;
595
  }
wmmhello's avatar
wmmhello 已提交
596
  tDecoderClear(&dc);
597 598 599 600

  return ret;
}

H
Haojun Liao 已提交
601
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext* ctx) {
602
  SMetaTableInfo result = {0};
603 604 605
  void*          pKey = NULL;
  void*          pVal = NULL;
  int            vLen, kLen;
606

607 608
  while (1) {
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
wmmhello's avatar
wmmhello 已提交
609
      metaDebug("tmqsnap get uid info end");
610 611
      return result;
    }
wmmhello's avatar
wmmhello 已提交
612 613 614
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
    ctx->index++;
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
615 616 617 618
    if (!idInfo) {
      metaError("meta/snap: null idInfo");
      return result;
    }
619

620
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
621
    if (ret != 0) {
H
Haojun Liao 已提交
622
      metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version);
623 624
      continue;
    }
625
    tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
626 627 628 629
    SDecoder   dc = {0};
    SMetaEntry me = {0};
    tDecoderInit(&dc, pVal, vLen);
    metaDecodeEntry(&dc, &me);
630
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
631

632 633 634
    if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
635 636
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
637 638
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
      strcpy(result.tbName, me.name);
639 640 641 642 643
      tDecoderClear(&dc);
      break;
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
      result.uid = me.uid;
      result.suid = 0;
wmmhello's avatar
wmmhello 已提交
644 645
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
646 647
      tDecoderClear(&dc);
      break;
648 649 650
    } else if (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid) {
      STableInfoForChildTable* data =
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
651 652
      result.uid = me.uid;
      result.suid = me.ctbEntry.suid;
wmmhello's avatar
wmmhello 已提交
653 654
      strcpy(result.tbName, me.name);
      result.schema = tCloneSSchemaWrapper(data->schemaRow);
655 656
      tDecoderClear(&dc);
      break;
657
    } else {
wmmhello's avatar
wmmhello 已提交
658
      metaDebug("tmqsnap get uid continue");
659 660 661 662 663 664 665
      tDecoderClear(&dc);
      continue;
    }
  }

  return result;
}