tsdbSnapshot.c 54.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
H
Hongze Cheng 已提交
19 20 21
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
H
Hongze Cheng 已提交
22 23 24 25 26

// STsdbDataIter2 ========================================
#define TSDB_MEM_TABLE_DATA_ITER 0
#define TSDB_DATA_FILE_DATA_ITER 1
#define TSDB_STT_FILE_DATA_ITER  2
H
Hongze Cheng 已提交
27
#define TSDB_TOMB_FILE_DATA_ITER 3
H
Hongze Cheng 已提交
28

H
Hongze Cheng 已提交
29 30 31
typedef struct STsdbDataIter2  STsdbDataIter2;
typedef struct STsdbFilterInfo STsdbFilterInfo;

H
Hongze Cheng 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
struct STsdbDataIter2 {
  STsdbDataIter2* next;
  SRBTreeNode     rbtn;

  int32_t  type;
  SRowInfo rowInfo;
  union {
    // TSDB_MEM_TABLE_DATA_ITER
    struct {
      SMemTable* pMemTable;
    } mIter;

    // TSDB_DATA_FILE_DATA_ITER
    struct {
      SDataFReader* pReader;
      SArray*       aBlockIdx;  // SArray<SBlockIdx>
      SMapData      mDataBlk;
      SBlockData    bData;
      int32_t       iBlockIdx;
      int32_t       iDataBlk;
      int32_t       iRow;
    } dIter;

    // TSDB_STT_FILE_DATA_ITER
    struct {
      SDataFReader* pReader;
      int32_t       iStt;
      SArray*       aSttBlk;
      SBlockData    bData;
      int32_t       iSttBlk;
      int32_t       iRow;
    } sIter;
H
Hongze Cheng 已提交
64 65 66 67 68 69 70 71
    // TSDB_TOMB_FILE_DATA_ITER
    struct {
      SDelFReader* pReader;
      SArray*      aDelIdx;
      SArray*      aDelData;
      int32_t      iDelIdx;
      int32_t      iDelData;
    } tIter;
H
Hongze Cheng 已提交
72 73 74
  };
};

H
Hongze Cheng 已提交
75 76 77 78 79 80 81
#define TSDB_FILTER_FLAG_BY_VERSION 0x1
struct STsdbFilterInfo {
  int32_t flag;
  int64_t sver;
  int64_t ever;
};

H
Hongze Cheng 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2*)(((char*)pNode) - offsetof(STsdbDataIter2, rbtn)))

/* open */
static int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  // create handle
  STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pIter->type = TSDB_DATA_FILE_DATA_ITER;
  pIter->dIter.pReader = pReader;
  if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  code = tBlockDataCreate(&pIter->dIter.bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
106 107 108
  pIter->dIter.iBlockIdx = 0;
  pIter->dIter.iDataBlk = 0;
  pIter->dIter.iRow = 0;
H
Hongze Cheng 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152

  // read data
  code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear;

_exit:
  if (code) {
    if (pIter) {
    _clear:
      tBlockDataDestroy(&pIter->dIter.bData, 1);
      taosArrayDestroy(pIter->dIter.aBlockIdx);
      taosMemoryFree(pIter);
      pIter = NULL;
    }
  }
  *ppIter = pIter;
  return code;
}

static int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  // create handle
  STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pIter->type = TSDB_STT_FILE_DATA_ITER;
  pIter->sIter.pReader = pReader;
  pIter->sIter.iStt = iStt;
  pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pIter->sIter.aSttBlk == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  code = tBlockDataCreate(&pIter->sIter.bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
153 154
  pIter->sIter.iSttBlk = 0;
  pIter->sIter.iRow = 0;
H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175

  // read data
  code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear;

_exit:
  if (code) {
    if (pIter) {
    _clear:
      taosArrayDestroy(pIter->sIter.aSttBlk);
      tBlockDataDestroy(&pIter->sIter.bData, 1);
      taosMemoryFree(pIter);
      pIter = NULL;
    }
  }
  *ppIter = pIter;
  return code;
}

H
Hongze Cheng 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
static int32_t tsdbOpenTombFileDataIter(SDelFReader* pReader, STsdbDataIter2** ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  pIter->type = TSDB_TOMB_FILE_DATA_ITER;

  pIter->tIter.pReader = pReader;
  if ((pIter->tIter.aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  if ((pIter->tIter.aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  code = tsdbReadDelIdx(pReader, pIter->tIter.aDelIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pIter->tIter.aDelIdx) == 0) goto _clear;

  pIter->tIter.iDelIdx = 0;
  pIter->tIter.iDelData = 0;

_exit:
  if (code) {
    if (pIter) {
    _clear:
      taosArrayDestroy(pIter->tIter.aDelIdx);
      taosArrayDestroy(pIter->tIter.aDelData);
      taosMemoryFree(pIter);
      pIter = NULL;
    }
  }
  *ppIter = pIter;
  return code;
}

H
Hongze Cheng 已提交
219 220 221 222 223 224 225 226 227 228 229 230 231 232
/* close */
static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) {
  tBlockDataDestroy(&pIter->dIter.bData, 1);
  tMapDataClear(&pIter->dIter.mDataBlk);
  taosArrayDestroy(pIter->dIter.aBlockIdx);
  taosMemoryFree(pIter);
}

static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) {
  tBlockDataDestroy(&pIter->sIter.bData, 1);
  taosArrayDestroy(pIter->sIter.aSttBlk);
  taosMemoryFree(pIter);
}

H
Hongze Cheng 已提交
233 234 235 236 237 238
static void tsdbCloseTombFileDataIter(STsdbDataIter2* pIter) {
  taosArrayDestroy(pIter->tIter.aDelData);
  taosArrayDestroy(pIter->tIter.aDelIdx);
  taosMemoryFree(pIter);
}

H
Hongze Cheng 已提交
239 240 241 242 243 244 245
static void tsdbCloseDataIter2(STsdbDataIter2* pIter) {
  if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
    ASSERT(0);
  } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
    tsdbCloseDataFileDataIter(pIter);
  } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
    tsdbCloseSttFileDataIter(pIter);
H
Hongze Cheng 已提交
246 247
  } else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) {
    tsdbCloseTombFileDataIter(pIter);
H
Hongze Cheng 已提交
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
  } else {
    ASSERT(0);
  }
}

/* cmpr */
static int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
  STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1);
  STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2);
  return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}

/* seek */

/* iter next */
H
Hongze Cheng 已提交
263
static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
H
Hongze Cheng 已提交
264 265 266 267
  int32_t code = 0;
  int32_t lino = 0;

  for (;;) {
H
Hongze Cheng 已提交
268 269 270 271 272 273 274 275 276 277 278
    while (pIter->dIter.iRow < pIter->dIter.bData.nRow) {
      if (pFilterInfo) {
        if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
          if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver ||
              pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) {
            pIter->dIter.iRow++;
            continue;
          }
        }
      }

H
Hongze Cheng 已提交
279 280 281
      pIter->rowInfo.suid = pIter->dIter.bData.suid;
      pIter->rowInfo.uid = pIter->dIter.bData.uid;
      pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
H
Hongze Cheng 已提交
282
      pIter->dIter.iRow++;
H
Hongze Cheng 已提交
283
      goto _exit;
H
Hongze Cheng 已提交
284 285 286
    }

    for (;;) {
H
Hongze Cheng 已提交
287
      while (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
H
Hongze Cheng 已提交
288 289 290
        SDataBlk dataBlk;
        tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);

H
Hongze Cheng 已提交
291 292 293 294 295 296 297 298 299 300
        // filter
        if (pFilterInfo) {
          if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
            if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) {
              pIter->dIter.iDataBlk++;
              continue;
            }
          }
        }

H
Hongze Cheng 已提交
301 302 303
        code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData);
        TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
304 305
        pIter->dIter.iDataBlk++;
        pIter->dIter.iRow = 0;
H
Hongze Cheng 已提交
306 307 308 309

        break;
      }

H
Hongze Cheng 已提交
310 311
      if (pIter->dIter.iRow < pIter->dIter.bData.nRow) break;

H
Hongze Cheng 已提交
312
      for (;;) {
H
Hongze Cheng 已提交
313
        if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
H
Hongze Cheng 已提交
314 315 316 317 318
          SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);

          code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
          TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
319 320
          pIter->dIter.iBlockIdx++;
          pIter->dIter.iDataBlk = 0;
H
Hongze Cheng 已提交
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337

          break;
        } else {
          pIter->rowInfo = (SRowInfo){0};
          goto _exit;
        }
      }
    }
  }

_exit:
  if (code) {
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
338
static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
H
Hongze Cheng 已提交
339 340 341 342
  int32_t code = 0;
  int32_t lino = 0;

  for (;;) {
H
Hongze Cheng 已提交
343 344 345 346 347 348 349 350 351 352 353
    while (pIter->sIter.iRow < pIter->sIter.bData.nRow) {
      if (pFilterInfo) {
        if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
          if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] ||
              pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) {
            pIter->sIter.iRow++;
            continue;
          }
        }
      }

H
Hongze Cheng 已提交
354 355 356
      pIter->rowInfo.suid = pIter->sIter.bData.suid;
      pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
      pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow);
H
Hongze Cheng 已提交
357
      pIter->sIter.iRow++;
H
Hongze Cheng 已提交
358
      goto _exit;
H
Hongze Cheng 已提交
359 360
    }

H
Hongze Cheng 已提交
361 362 363 364 365 366 367 368 369 370 371 372
    for (;;) {
      if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
        SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);

        if (pFilterInfo) {
          if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
            if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) {
              pIter->sIter.iSttBlk++;
              continue;
            }
          }
        }
H
Hongze Cheng 已提交
373

H
Hongze Cheng 已提交
374 375
        code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
376

H
Hongze Cheng 已提交
377 378 379 380 381 382 383
        pIter->sIter.iRow = 0;
        pIter->sIter.iSttBlk++;
        break;
      } else {
        pIter->rowInfo = (SRowInfo){0};
        goto _exit;
      }
H
Hongze Cheng 已提交
384 385 386 387 388 389 390 391 392 393
    }
  }

_exit:
  if (code) {
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
394
static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
H
Hongze Cheng 已提交
395 396 397 398 399 400
  int32_t code = 0;

  if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
    ASSERT(0);
    return code;
  } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
H
Hongze Cheng 已提交
401
    return tsdbDataFileDataIterNext(pIter, pFilterInfo);
H
Hongze Cheng 已提交
402
  } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
H
Hongze Cheng 已提交
403
    return tsdbSttFileDataIterNext(pIter, pFilterInfo);
H
Hongze Cheng 已提交
404 405 406 407 408 409 410 411
  } else {
    ASSERT(0);
    return code;
  }
}

/* get */

H
Hongze Cheng 已提交
412
// STsdbSnapReader ========================================
H
Hongze Cheng 已提交
413
struct STsdbSnapReader {
H
Hongze Cheng 已提交
414 415 416 417 418
  STsdb*   pTsdb;
  int64_t  sver;
  int64_t  ever;
  int8_t   type;
  uint8_t* aBuf[5];
H
Hongze Cheng 已提交
419

H
Hongze Cheng 已提交
420
  STsdbFS  fs;
H
Hongze Cheng 已提交
421
  TABLEID  tbid;
H
Hongze Cheng 已提交
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
  SSkmInfo skmTable;

  // timeseries data
  int8_t  dataDone;
  int32_t fid;

  SDataFReader*   pDataFReader;
  STsdbDataIter2* iterList;
  STsdbDataIter2* pIter;
  SRBTree         rbt;
  SBlockData      bData;

  // tombstone data
  int8_t          delDone;
  SDelFReader*    pDelFReader;
  STsdbDataIter2* pTIter;
H
Hongze Cheng 已提交
438
  SArray*         aDelData;
H
Hongze Cheng 已提交
439
};
H
Hongze Cheng 已提交
440

H
Hongze Cheng 已提交
441
static int32_t tsdbSnapReadFileDataStart(STsdbSnapReader* pReader) {
H
Hongze Cheng 已提交
442
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
443
  int32_t lino = 0;
H
Hongze Cheng 已提交
444

H
Hongze Cheng 已提交
445 446 447 448 449
  SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT);
  if (pSet == NULL) {
    pReader->fid = INT32_MAX;
    goto _exit;
  }
H
Hongze Cheng 已提交
450

H
Hongze Cheng 已提交
451 452
  pReader->fid = pSet->fid;

H
Hongze Cheng 已提交
453
  tRBTreeCreate(&pReader->rbt, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
454

H
Hongze Cheng 已提交
455
  code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
H
add log  
Hongze Cheng 已提交
456
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
457

H
Hongze Cheng 已提交
458 459
  code = tsdbOpenDataFileDataIter(pReader->pDataFReader, &pReader->pIter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461 462 463 464 465 466
  if (pReader->pIter) {
    // iter to next with filter info (sver, ever)
    code = tsdbDataIterNext2(pReader->pIter,
                             &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION,  // flag
                                                .sver = pReader->sver,
                                                .ever = pReader->ever});
H
add log  
Hongze Cheng 已提交
467
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
468

H
Hongze Cheng 已提交
469 470 471
    if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) {
      // add to rbtree
      tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
H
Hongze Cheng 已提交
472

H
Hongze Cheng 已提交
473 474 475 476 477
      // add to iterList
      pReader->pIter->next = pReader->iterList;
      pReader->iterList = pReader->pIter;
    } else {
      tsdbCloseDataIter2(pReader->pIter);
H
Hongze Cheng 已提交
478 479
    }
  }
H
Hongze Cheng 已提交
480

H
Hongze Cheng 已提交
481 482
  for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
    code = tsdbOpenSttFileDataIter(pReader->pDataFReader, iStt, &pReader->pIter);
H
add log  
Hongze Cheng 已提交
483
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
484

H
Hongze Cheng 已提交
485 486 487 488 489 490
    if (pReader->pIter) {
      // iter to valid row
      code = tsdbDataIterNext2(pReader->pIter,
                               &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION,  // flag
                                                  .sver = pReader->sver,
                                                  .ever = pReader->ever});
H
add log  
Hongze Cheng 已提交
491
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
492

H
Hongze Cheng 已提交
493 494 495
      if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) {
        // add to rbtree
        tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
H
Hongze Cheng 已提交
496

H
Hongze Cheng 已提交
497 498 499 500 501
        // add to iterList
        pReader->pIter->next = pReader->iterList;
        pReader->iterList = pReader->pIter;
      } else {
        tsdbCloseDataIter2(pReader->pIter);
H
Hongze Cheng 已提交
502
      }
H
Hongze Cheng 已提交
503
    }
H
Hongze Cheng 已提交
504
  }
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506 507
  pReader->pIter = NULL;

H
add log  
Hongze Cheng 已提交
508 509
_exit:
  if (code) {
H
Hongze Cheng 已提交
510
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
add log  
Hongze Cheng 已提交
511
  } else {
H
Hongze Cheng 已提交
512
    tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->fid);
H
add log  
Hongze Cheng 已提交
513
  }
H
Hongze Cheng 已提交
514 515 516
  return code;
}

H
Hongze Cheng 已提交
517 518 519 520 521 522
static void tsdbSnapReadFileDataEnd(STsdbSnapReader* pReader) {
  while (pReader->iterList) {
    STsdbDataIter2* pIter = pReader->iterList;
    pReader->iterList = pIter->next;
    tsdbCloseDataIter2(pIter);
  }
H
Hongze Cheng 已提交
523

H
Hongze Cheng 已提交
524 525
  tsdbDataFReaderClose(&pReader->pDataFReader);
}
H
Hongze Cheng 已提交
526

H
Hongze Cheng 已提交
527 528 529
static int32_t tsdbSnapReadNextRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
530

H
Hongze Cheng 已提交
531 532 533 534 535
  if (pReader->pIter) {
    code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION,  // flag
                                                                .sver = pReader->sver,
                                                                .ever = pReader->ever});
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
536

H
Hongze Cheng 已提交
537 538 539 540 541 542 543 544 545 546 547
    if (pReader->pIter->rowInfo.suid == 0 && pReader->pIter->rowInfo.uid == 0) {
      pReader->pIter = NULL;
    } else {
      SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt);
      if (pNode) {
        int32_t c = tsdbDataIterCmprFn(&pReader->pIter->rbtn, pNode);
        if (c > 0) {
          tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
          pReader->pIter = NULL;
        } else if (c == 0) {
          ASSERT(0);
H
Hongze Cheng 已提交
548
        }
H
Hongze Cheng 已提交
549 550
      }
    }
H
Hongze Cheng 已提交
551
  }
H
Hongze Cheng 已提交
552

H
Hongze Cheng 已提交
553 554 555 556 557
  if (pReader->pIter == NULL) {
    SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt);
    if (pNode) {
      tRBTreeDrop(&pReader->rbt, pNode);
      pReader->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
H
Hongze Cheng 已提交
558 559 560
    }
  }

H
Hongze Cheng 已提交
561
  if (ppRowInfo) {
H
Hongze Cheng 已提交
562
    if (pReader->pIter) {
H
Hongze Cheng 已提交
563 564 565
      *ppRowInfo = &pReader->pIter->rowInfo;
    } else {
      *ppRowInfo = NULL;
H
Hongze Cheng 已提交
566 567 568
    }
  }

H
Hongze Cheng 已提交
569 570 571 572
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
573 574 575
  return code;
}

H
Hongze Cheng 已提交
576
static int32_t tsdbSnapReadGetRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) {
H
Hongze Cheng 已提交
577
  if (pReader->pIter) {
H
Hongze Cheng 已提交
578 579
    *ppRowInfo = &pReader->pIter->rowInfo;
    return 0;
H
Hongze Cheng 已提交
580
  }
H
Hongze Cheng 已提交
581 582

  return tsdbSnapReadNextRow(pReader, ppRowInfo);
H
Hongze Cheng 已提交
583 584
}

H
Hongze Cheng 已提交
585 586 587
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;

588
  ASSERT(pReader->bData.nRow);
H
Hongze Cheng 已提交
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617

  int32_t aBufN[5] = {0};
  code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN);
  if (code) goto _exit;

  int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
  if (*ppData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData;
  pHdr->type = SNAP_DATA_TSDB;
  pHdr->size = size;

  memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]);
  memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]);
  if (aBufN[1]) {
    memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]);
  }
  if (aBufN[0]) {
    memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]);
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
618
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** ppData) {
H
Hongze Cheng 已提交
619
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
620 621 622
  int32_t lino = 0;

  STsdb* pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
623

H
Hongze Cheng 已提交
624 625 626 627
  tBlockDataClear(&pReader->bData);

  for (;;) {
    // start a new file read if need
H
Hongze Cheng 已提交
628
    if (pReader->pDataFReader == NULL) {
H
Hongze Cheng 已提交
629
      code = tsdbSnapReadFileDataStart(pReader);
H
add log  
Hongze Cheng 已提交
630
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
631 632 633 634
    }

    if (pReader->pDataFReader == NULL) break;

H
Hongze Cheng 已提交
635 636 637 638
    SRowInfo* pRowInfo;
    code = tsdbSnapReadGetRow(pReader, &pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
639
    if (pRowInfo == NULL) {
H
Hongze Cheng 已提交
640
      tsdbSnapReadFileDataEnd(pReader);
H
Hongze Cheng 已提交
641 642 643
      continue;
    }

H
Hongze Cheng 已提交
644
    code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, pRowInfo->suid, pRowInfo->uid, &pReader->skmTable);
H
add log  
Hongze Cheng 已提交
645
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
646

H
Hongze Cheng 已提交
647
    code = tBlockDataInit(&pReader->bData, (TABLEID*)pRowInfo, pReader->skmTable.pTSchema, NULL, 0);
H
add log  
Hongze Cheng 已提交
648
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
649

H
Hongze Cheng 已提交
650 651 652 653 654 655 656 657 658 659 660 661 662 663
    do {
      if (!TABLE_SAME_SCHEMA(pReader->bData.suid, pReader->bData.uid, pRowInfo->suid, pRowInfo->uid)) break;

      if (pReader->bData.uid && pReader->bData.uid != pRowInfo->uid) {
        code = tRealloc((uint8_t**)&pReader->bData.aUid, sizeof(int64_t) * (pReader->bData.nRow + 1));
        TSDB_CHECK_CODE(code, lino, _exit);

        for (int32_t iRow = 0; iRow < pReader->bData.nRow; ++iRow) {
          pReader->bData.aUid[iRow] = pReader->bData.uid;
        }
        pReader->bData.uid = 0;
      }

      code = tBlockDataAppendRow(&pReader->bData, &pRowInfo->row, NULL, pRowInfo->uid);
H
add log  
Hongze Cheng 已提交
664
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
665

H
Hongze Cheng 已提交
666
      code = tsdbSnapReadNextRow(pReader, &pRowInfo);
H
add log  
Hongze Cheng 已提交
667
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
668

H
Hongze Cheng 已提交
669 670
      if (pReader->bData.nRow >= 4096) break;
    } while (pRowInfo);
H
Hongze Cheng 已提交
671

H
Hongze Cheng 已提交
672 673 674 675
    ASSERT(pReader->bData.nRow > 0);

    break;
  }
H
Hongze Cheng 已提交
676

H
Hongze Cheng 已提交
677
  if (pReader->bData.nRow > 0) {
H
Hongze Cheng 已提交
678
    code = tsdbSnapCmprData(pReader, ppData);
H
add log  
Hongze Cheng 已提交
679
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
680 681
  }

H
add log  
Hongze Cheng 已提交
682 683
_exit:
  if (code) {
H
Hongze Cheng 已提交
684
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
add log  
Hongze Cheng 已提交
685
  }
H
Hongze Cheng 已提交
686 687 688
  return code;
}

H
Hongze Cheng 已提交
689
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData) {
H
add log  
Hongze Cheng 已提交
690 691 692
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
693 694 695 696
  int64_t size = sizeof(TABLEID);
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) {
    size += tPutDelData(NULL, taosArrayGet(pReader->aDelData, iDelData));
  }
H
Hongze Cheng 已提交
697

H
Hongze Cheng 已提交
698 699 700
  uint8_t* pData = (uint8_t*)taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
  if (pData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
add log  
Hongze Cheng 已提交
701
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
702
  }
H
Hongze Cheng 已提交
703

H
Hongze Cheng 已提交
704 705 706 707 708 709
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
  pHdr->type = SNAP_DATA_DEL;
  pHdr->size = size;

  TABLEID* pId = (TABLEID*)(pData + sizeof(SSnapDataHdr));
  *pId = pReader->tbid;
H
Hongze Cheng 已提交
710

H
Hongze Cheng 已提交
711 712 713
  size = sizeof(SSnapDataHdr) + sizeof(TABLEID);
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) {
    size += tPutDelData(pData + size, taosArrayGet(pReader->aDelData, iDelData));
H
Hongze Cheng 已提交
714 715
  }

H
Hongze Cheng 已提交
716 717 718 719 720 721
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
    if (pData) {
      taosMemoryFree(pData);
      pData = NULL;
H
Hongze Cheng 已提交
722
    }
H
Hongze Cheng 已提交
723 724 725 726
  }
  *ppData = pData;
  return code;
}
H
Hongze Cheng 已提交
727

H
Hongze Cheng 已提交
728 729 730
static int32_t tsdbSnapReadGetTombData(STsdbSnapReader* pReader, void* pDelInfo) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
731

H
Hongze Cheng 已提交
732 733 734 735 736 737 738 739
  ASSERT(0);
  // TODO
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}
H
Hongze Cheng 已提交
740

H
Hongze Cheng 已提交
741 742 743 744 745 746 747 748 749 750
static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, void* pDelInfo) {
  int32_t code = 0;
  int32_t lino = 0;
  // TODO
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}
H
Hongze Cheng 已提交
751

H
Hongze Cheng 已提交
752 753 754
static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
755

H
Hongze Cheng 已提交
756
  STsdb* pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
757

H
Hongze Cheng 已提交
758 759 760 761 762 763 764 765 766 767
  if (pReader->pDelFReader == NULL) {
    if (pReader->fs.pDelFile == NULL) goto _exit;

    // open
    code = tsdbDelFReaderOpen(&pReader->pDelFReader, pReader->fs.pDelFile, pTsdb);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
768

H
Hongze Cheng 已提交
769 770 771 772 773 774 775
  struct {
    int64_t  suid;
    int64_t  uid;
    SDelData dData;
  }* pDelInfo;
  code = tsdbSnapReadGetTombData(pReader, &pDelInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
776

H
Hongze Cheng 已提交
777
  if (pDelInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
778

H
Hongze Cheng 已提交
779
  pReader->tbid = *(TABLEID*)pDelInfo;
H
Hongze Cheng 已提交
780

H
Hongze Cheng 已提交
781 782 783 784 785 786 787 788 789 790 791
  if (pReader->aDelData) {
    taosArrayClear(pReader->aDelData);
  } else if ((pReader->aDelData = taosArrayInit(16, sizeof(SDelData))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) {
    if (taosArrayPush(pReader->aDelData, &pDelInfo->dData) < 0) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
792 793
    }

H
Hongze Cheng 已提交
794 795 796
    code = tsdbSnapReadNextTombData(pReader, &pDelInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
797

H
Hongze Cheng 已提交
798 799 800
  if (taosArrayGetSize(pReader->aDelData) > 0) {
    code = tsdbSnapCmprTombData(pReader, ppData);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
801
  }
H
Hongze Cheng 已提交
802 803

_exit:
H
add log  
Hongze Cheng 已提交
804
  if (code) {
H
Hongze Cheng 已提交
805 806 807
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
add log  
Hongze Cheng 已提交
808
  }
H
Hongze Cheng 已提交
809 810
  return code;
}
H
more  
Hongze Cheng 已提交
811

C
Cary Xu 已提交
812
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
813 814
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
815

H
more  
Hongze Cheng 已提交
816
  // alloc
H
Hongze Cheng 已提交
817
  STsdbSnapReader* pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
more  
Hongze Cheng 已提交
818 819
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
820
    TSDB_CHECK_CODE(code, lino, _exit);
H
more  
Hongze Cheng 已提交
821 822 823 824
  }
  pReader->pTsdb = pTsdb;
  pReader->sver = sver;
  pReader->ever = ever;
C
Cary Xu 已提交
825
  pReader->type = type;
H
more  
Hongze Cheng 已提交
826

H
Hongze Cheng 已提交
827
  taosThreadRwlockRdlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
828 829 830
  code = tsdbFSRef(pTsdb, &pReader->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
831
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
832
  }
H
Hongze Cheng 已提交
833
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
834

H
Hongze Cheng 已提交
835
  // init
H
Hongze Cheng 已提交
836
  pReader->fid = INT32_MIN;
H
Hongze Cheng 已提交
837 838

  code = tBlockDataCreate(&pReader->bData);
H
Hongze Cheng 已提交
839
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
840

H
Hongze Cheng 已提交
841 842
_exit:
  if (code) {
H
Hongze Cheng 已提交
843 844
    tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode),
              __func__, lino, tstrerror(code), sver, ever, type);
H
Hongze Cheng 已提交
845 846
    if (pReader) {
      tBlockDataDestroy(&pReader->bData, 1);
H
Hongze Cheng 已提交
847
      tsdbFSUnref(pTsdb, &pReader->fs);
H
Hongze Cheng 已提交
848
      taosMemoryFree(pReader);
H
Hongze Cheng 已提交
849
      pReader = NULL;
H
Hongze Cheng 已提交
850 851
    }
  } else {
H
Hongze Cheng 已提交
852 853
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), __func__, sver, ever,
             type);
H
Hongze Cheng 已提交
854
  }
H
Hongze Cheng 已提交
855
  *ppReader = pReader;
H
more  
Hongze Cheng 已提交
856
  return code;
H
Hongze Cheng 已提交
857 858
}

H
Hongze Cheng 已提交
859
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
860 861
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
862

H
Hongze Cheng 已提交
863
  STsdbSnapReader* pReader = *ppReader;
H
Hongze Cheng 已提交
864

H
Hongze Cheng 已提交
865 866 867 868 869 870 871
  // tombstone
  if (pReader->pTIter) {
    tsdbCloseDataIter2(pReader->pTIter);
    pReader->pTIter = NULL;
  }
  if (pReader->pDelFReader) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
H
Hongze Cheng 已提交
872
  }
H
Hongze Cheng 已提交
873

H
Hongze Cheng 已提交
874
  // timeseries
H
Hongze Cheng 已提交
875
  tBlockDataDestroy(&pReader->bData, 1);
H
Hongze Cheng 已提交
876 877 878 879 880 881 882 883
  while (pReader->iterList) {
    STsdbDataIter2* pIter = pReader->iterList;
    pReader->iterList = pIter->next;
    tsdbCloseDataIter2(pIter);
  }
  if (pReader->pDataFReader) {
    tsdbDataFReaderClose(&pReader->pDataFReader);
  }
H
Hongze Cheng 已提交
884

H
Hongze Cheng 已提交
885 886
  // other
  tDestroyTSchema(pReader->skmTable.pTSchema);
H
Hongze Cheng 已提交
887
  tsdbFSUnref(pReader->pTsdb, &pReader->fs);
H
Hongze Cheng 已提交
888 889 890
  for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
    tFree(pReader->aBuf[iBuf]);
  }
H
Hongze Cheng 已提交
891
  taosMemoryFree(pReader);
H
Hongze Cheng 已提交
892 893 894 895 896 897 898

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
899
  *ppReader = NULL;
H
Hongze Cheng 已提交
900 901 902 903
  return code;
}

int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
H
more  
Hongze Cheng 已提交
904
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
905
  int32_t lino = 0;
H
Hongze Cheng 已提交
906

H
Hongze Cheng 已提交
907 908
  *ppData = NULL;

H
Hongze Cheng 已提交
909
  // read data file
H
Hongze Cheng 已提交
910
  if (!pReader->dataDone) {
H
Hongze Cheng 已提交
911
    code = tsdbSnapReadTimeSeriesData(pReader, ppData);
H
add log  
Hongze Cheng 已提交
912 913 914
    TSDB_CHECK_CODE(code, lino, _exit);
    if (*ppData) {
      goto _exit;
H
Hongze Cheng 已提交
915
    } else {
H
add log  
Hongze Cheng 已提交
916
      pReader->dataDone = 1;
H
Hongze Cheng 已提交
917 918
    }
  }
H
Hongze Cheng 已提交
919 920

  // read del file
H
Hongze Cheng 已提交
921
  if (!pReader->delDone) {
H
Hongze Cheng 已提交
922
    code = tsdbSnapReadTombData(pReader, ppData);
H
add log  
Hongze Cheng 已提交
923 924 925
    TSDB_CHECK_CODE(code, lino, _exit);
    if (*ppData) {
      goto _exit;
H
Hongze Cheng 已提交
926
    } else {
H
add log  
Hongze Cheng 已提交
927
      pReader->delDone = 1;
H
Hongze Cheng 已提交
928 929
    }
  }
H
Hongze Cheng 已提交
930

H
Hongze Cheng 已提交
931
_exit:
H
add log  
Hongze Cheng 已提交
932
  if (code) {
H
Hongze Cheng 已提交
933
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
add log  
Hongze Cheng 已提交
934
  } else {
H
Hongze Cheng 已提交
935
    tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__);
H
add log  
Hongze Cheng 已提交
936
  }
H
more  
Hongze Cheng 已提交
937 938 939
  return code;
}

H
Hongze Cheng 已提交
940
// STsdbSnapWriter ========================================
H
Hongze Cheng 已提交
941
struct STsdbSnapWriter {
H
Hongze Cheng 已提交
942 943 944
  STsdb*   pTsdb;
  int64_t  sver;
  int64_t  ever;
H
Hongze Cheng 已提交
945 946 947 948 949 950
  int32_t  minutes;
  int8_t   precision;
  int32_t  minRow;
  int32_t  maxRow;
  int8_t   cmprAlg;
  int64_t  commitID;
H
Hongze Cheng 已提交
951
  uint8_t* aBuf[5];
H
Hongze Cheng 已提交
952

H
Hongze Cheng 已提交
953
  STsdbFS fs;
H
Hongze Cheng 已提交
954
  TABLEID tbid;
H
Hongze Cheng 已提交
955

H
Hongze Cheng 已提交
956 957 958 959 960 961 962 963 964 965
  // time-series data
  SBlockData inData;

  int32_t  fid;
  SSkmInfo skmTable;

  /* reader */
  SDataFReader*   pDataFReader;
  STsdbDataIter2* iterList;
  STsdbDataIter2* pDIter;
H
Hongze Cheng 已提交
966
  STsdbDataIter2* pSIter;
H
Hongze Cheng 已提交
967 968 969 970 971 972 973 974 975 976 977
  SRBTree         rbt;  // SRBTree<STsdbDataIter2>

  /* writer */
  SDataFWriter* pDataFWriter;
  SArray*       aBlockIdx;
  SMapData      mDataBlk;  // SMapData<SDataBlk>
  SArray*       aSttBlk;   // SArray<SSttBlk>
  SBlockData    bData;
  SBlockData    sData;

  // tombstone data
H
Hongze Cheng 已提交
978 979 980 981 982
  /* reader */
  SDelFReader*    pDelFReader;
  STsdbDataIter2* pTIter;

  /* writer */
H
Hongze Cheng 已提交
983
  SDelFWriter* pDelFWriter;
H
Hongze Cheng 已提交
984
  SArray*      aDelIdx;
H
Hongze Cheng 已提交
985
  SArray*      aDelData;
H
Hongze Cheng 已提交
986 987
};

H
Hongze Cheng 已提交
988
// SNAP_DATA_TSDB
H
Hongze Cheng 已提交
989 990
static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
  int32_t code = 0;
H
Hongze Cheng 已提交
991 992
  int32_t lino = 0;

H
Hongze Cheng 已提交
993 994 995 996 997 998
  if (pId) {
    pWriter->tbid = *pId;
  } else {
    pWriter->tbid = (TABLEID){INT64_MAX, INT64_MAX};
  }

H
Hongze Cheng 已提交
999 1000
  if (pWriter->pDIter) {
    STsdbDataIter2* pIter = pWriter->pDIter;
H
Hongze Cheng 已提交
1001

H
Hongze Cheng 已提交
1002
    // assert last table data end
H
Hongze Cheng 已提交
1003 1004 1005
    ASSERT(pIter->dIter.iRow >= pIter->dIter.bData.nRow);
    ASSERT(pIter->dIter.iDataBlk >= pIter->dIter.mDataBlk.nItem);

H
Hongze Cheng 已提交
1006
    for (;;) {
H
Hongze Cheng 已提交
1007
      if (pIter->dIter.iBlockIdx >= taosArrayGetSize(pIter->dIter.aBlockIdx)) {
H
Hongze Cheng 已提交
1008 1009 1010 1011
        pWriter->pDIter = NULL;
        break;
      }

H
Hongze Cheng 已提交
1012
      SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
H
Hongze Cheng 已提交
1013

H
Hongze Cheng 已提交
1014
      int32_t c = tTABLEIDCmprFn(pBlockIdx, &pWriter->tbid);
H
Hongze Cheng 已提交
1015 1016 1017 1018 1019 1020
      if (c < 0) {
        code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
        TSDB_CHECK_CODE(code, lino, _exit);

        SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
        if (pNewBlockIdx == NULL) {
H
Hongze Cheng 已提交
1021
          code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1022 1023 1024 1025 1026 1027
          TSDB_CHECK_CODE(code, lino, _exit);
        }

        pNewBlockIdx->suid = pBlockIdx->suid;
        pNewBlockIdx->uid = pBlockIdx->uid;

H
Hongze Cheng 已提交
1028
        code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pNewBlockIdx);
H
Hongze Cheng 已提交
1029 1030
        TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
1031 1032
        pIter->dIter.iBlockIdx++;
      } else if (c == 0) {
H
Hongze Cheng 已提交
1033 1034 1035
        code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
        TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
1036
        pIter->dIter.iDataBlk = 0;
H
Hongze Cheng 已提交
1037
        pIter->dIter.iBlockIdx++;
H
Hongze Cheng 已提交
1038 1039 1040

        break;
      } else {
H
Hongze Cheng 已提交
1041
        pIter->dIter.iDataBlk = pIter->dIter.mDataBlk.nItem;
H
Hongze Cheng 已提交
1042 1043 1044 1045
        break;
      }
    }
  }
H
Hongze Cheng 已提交
1046

H
Hongze Cheng 已提交
1047
  if (pId) {
H
Hongze Cheng 已提交
1048 1049
    code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1050

H
Hongze Cheng 已提交
1051
    tMapDataReset(&pWriter->mDataBlk);
H
Hongze Cheng 已提交
1052

H
Hongze Cheng 已提交
1053 1054
    code = tBlockDataInit(&pWriter->bData, pId, pWriter->skmTable.pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1055
  }
H
Hongze Cheng 已提交
1056

H
Hongze Cheng 已提交
1057 1058 1059 1060 1061 1062
  if (!TABLE_SAME_SCHEMA(pWriter->tbid.suid, pWriter->tbid.uid, pWriter->sData.suid, pWriter->sData.uid)) {
    if ((pWriter->sData.nRow > 0)) {
      code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
1063 1064 1065 1066 1067
    if (pId) {
      TABLEID id = {.suid = pWriter->tbid.suid, .uid = pWriter->tbid.suid ? 0 : pWriter->tbid.uid};
      code = tBlockDataInit(&pWriter->sData, &id, pWriter->skmTable.pTSchema, NULL, 0);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
1068
  }
H
Hongze Cheng 已提交
1069

H
Hongze Cheng 已提交
1070 1071 1072
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1073
  } else {
H
Hongze Cheng 已提交
1074 1075
    tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
              pWriter->tbid.suid, pWriter->tbid.uid);
H
Hongze Cheng 已提交
1076
  }
H
Hongze Cheng 已提交
1077 1078 1079
  return code;
}

H
Hongze Cheng 已提交
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->skmTable.pTSchema, pWriter->tbid.uid);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pWriter->bData.nRow >= pWriter->maxRow) {
    code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
  int32_t code = 0;
  int32_t lino = 0;

  TSDBKEY inKey = pRow ? TSDBROW_KEY(pRow) : TSDBKEY_MAX;

  if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
                                  pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
    goto _write_row;
  } else {
    for (;;) {
      while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) {
        TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow);

        int32_t c = tsdbKeyCmprFn(&inKey, &TSDBROW_KEY(&row));
        if (c < 0) {
          goto _write_row;
        } else if (c > 0) {
          code = tsdbSnapWriteTableRowImpl(pWriter, &row);
          TSDB_CHECK_CODE(code, lino, _exit);

          pWriter->pDIter->dIter.iRow++;
        } else {
          ASSERT(0);
        }
      }

      for (;;) {
        if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) goto _write_row;

        // FIXME: Here can be slow, use array instead
        SDataBlk dataBlk;
        tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);

        int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = inKey, .maxKey = inKey});
        if (c > 0) {
          goto _write_row;
        } else if (c < 0) {
          if (pWriter->bData.nRow > 0) {
            code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
            TSDB_CHECK_CODE(code, lino, _exit);
          }

          tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
          pWriter->pDIter->dIter.iDataBlk++;
        } else {
          code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
          TSDB_CHECK_CODE(code, lino, _exit);

          pWriter->pDIter->dIter.iRow = 0;
          pWriter->pDIter->dIter.iDataBlk++;
          break;
        }
      }
    }
  }

_write_row:
  if (pRow) {
    code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
1169 1170
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1171
  int32_t lino = 0;
H
Hongze Cheng 已提交
1172

H
Hongze Cheng 已提交
1173 1174 1175
  // write a NULL row to end current table data write
  code = tsdbSnapWriteTableRow(pWriter, NULL);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1176

H
Hongze Cheng 已提交
1177 1178
  if (pWriter->bData.nRow > 0) {
    if (pWriter->bData.nRow < pWriter->minRow) {
H
Hongze Cheng 已提交
1179
      ASSERT(TABLE_SAME_SCHEMA(pWriter->sData.suid, pWriter->sData.uid, pWriter->tbid.suid, pWriter->tbid.uid));
H
Hongze Cheng 已提交
1180
      for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) {
H
Hongze Cheng 已提交
1181 1182
        code =
            tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow), NULL, pWriter->tbid.uid);
H
Hongze Cheng 已提交
1183
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1184

H
Hongze Cheng 已提交
1185 1186 1187 1188
        if (pWriter->sData.nRow >= pWriter->maxRow) {
          code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg);
          TSDB_CHECK_CODE(code, lino, _exit);
        }
H
Hongze Cheng 已提交
1189 1190
      }

H
Hongze Cheng 已提交
1191
      tBlockDataClear(&pWriter->bData);
H
Hongze Cheng 已提交
1192 1193 1194
    } else {
      code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1195 1196 1197
    }
  }

H
Hongze Cheng 已提交
1198 1199 1200
  if (pWriter->mDataBlk.nItem) {
    SBlockIdx* pBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
    if (pBlockIdx == NULL) {
H
Hongze Cheng 已提交
1201
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1202
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1203 1204
    }

H
Hongze Cheng 已提交
1205 1206 1207
    pBlockIdx->suid = pWriter->tbid.suid;
    pBlockIdx->uid = pWriter->tbid.uid;

H
Hongze Cheng 已提交
1208 1209 1210
    code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
1211

H
Hongze Cheng 已提交
1212 1213 1214 1215
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1216 1217 1218
  return code;
}

H
Hongze Cheng 已提交
1219
static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) {
H
Hongze Cheng 已提交
1220
  int32_t code = 0;
H
Hongze Cheng 已提交
1221 1222 1223
  int32_t lino = 0;

  ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid);
H
Hongze Cheng 已提交
1224

H
Hongze Cheng 已提交
1225
  STsdb* pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
1226 1227

  pWriter->fid = fid;
H
Hongze Cheng 已提交
1228
  pWriter->tbid = (TABLEID){0};
H
Hongze Cheng 已提交
1229 1230
  SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);

H
Hongze Cheng 已提交
1231 1232 1233 1234
  // open reader
  pWriter->pDataFReader = NULL;
  pWriter->iterList = NULL;
  pWriter->pDIter = NULL;
H
Hongze Cheng 已提交
1235
  pWriter->pSIter = NULL;
H
Hongze Cheng 已提交
1236
  tRBTreeCreate(&pWriter->rbt, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
1237
  if (pSet) {
H
Hongze Cheng 已提交
1238 1239
    code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1240

H
Hongze Cheng 已提交
1241 1242 1243 1244 1245 1246
    code = tsdbOpenDataFileDataIter(pWriter->pDataFReader, &pWriter->pDIter);
    TSDB_CHECK_CODE(code, lino, _exit);
    if (pWriter->pDIter) {
      pWriter->pDIter->next = pWriter->iterList;
      pWriter->iterList = pWriter->pDIter;
    }
H
Hongze Cheng 已提交
1247 1248

    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
1249
      code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pSIter);
H
Hongze Cheng 已提交
1250 1251
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
1252
      if (pWriter->pSIter) {
H
Hongze Cheng 已提交
1253
        code = tsdbSttFileDataIterNext(pWriter->pSIter, NULL);
H
Hongze Cheng 已提交
1254 1255 1256
        TSDB_CHECK_CODE(code, lino, _exit);

        // add to tree
H
Hongze Cheng 已提交
1257
        tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn);
H
Hongze Cheng 已提交
1258 1259

        // add to list
H
Hongze Cheng 已提交
1260 1261
        pWriter->pSIter->next = pWriter->iterList;
        pWriter->iterList = pWriter->pSIter;
H
Hongze Cheng 已提交
1262
      }
H
Hongze Cheng 已提交
1263
    }
H
Hongze Cheng 已提交
1264

H
Hongze Cheng 已提交
1265
    pWriter->pSIter = NULL;
H
Hongze Cheng 已提交
1266 1267 1268 1269 1270 1271
  }

  // open writer
  SDiskID diskId;
  if (pSet) {
    diskId = pSet->diskId;
H
Hongze Cheng 已提交
1272
  } else {
H
Hongze Cheng 已提交
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0 /*TODO*/, &diskId);
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, diskId);
  }
  SDFileSet wSet = {.diskId = diskId,
                    .fid = fid,
                    .pHeadF = &(SHeadFile){.commitID = pWriter->commitID},
                    .pDataF = (pSet) ? pSet->pDataF : &(SDataFile){.commitID = pWriter->commitID},
                    .pSmaF = (pSet) ? pSet->pSmaF : &(SSmaFile){.commitID = pWriter->commitID},
                    .nSttF = 1,
                    .aSttF = {&(SSttFile){.commitID = pWriter->commitID}}};
  code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pWriter->aBlockIdx) {
    taosArrayClear(pWriter->aBlockIdx);
  } else if ((pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1291
  }
H
Hongze Cheng 已提交
1292

H
Hongze Cheng 已提交
1293
  tMapDataReset(&pWriter->mDataBlk);
H
Hongze Cheng 已提交
1294

H
Hongze Cheng 已提交
1295 1296 1297 1298 1299 1300
  if (pWriter->aSttBlk) {
    taosArrayClear(pWriter->aSttBlk);
  } else if ((pWriter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
1301

H
Hongze Cheng 已提交
1302
  tBlockDataReset(&pWriter->bData);
H
Hongze Cheng 已提交
1303
  tBlockDataReset(&pWriter->sData);
H
Hongze Cheng 已提交
1304 1305 1306 1307 1308 1309 1310 1311

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code),
              fid);
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pTsdb->pVnode), __func__, fid);
  }
H
Hongze Cheng 已提交
1312 1313 1314
  return code;
}

H
Hongze Cheng 已提交
1315 1316 1317 1318 1319 1320
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;

  // switch to new table if need
  if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) {
H
Hongze Cheng 已提交
1321
    if (pWriter->tbid.uid) {
H
Hongze Cheng 已提交
1322 1323 1324 1325 1326 1327 1328 1329
      code = tsdbSnapWriteTableDataEnd(pWriter);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
1330 1331 1332 1333
  if (pRowInfo == NULL) goto _exit;

  code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1334 1335 1336 1337 1338 1339 1340 1341

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
1342
static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
H
Hongze Cheng 已提交
1343
  int32_t code = 0;
H
Hongze Cheng 已提交
1344
  int32_t lino = 0;
H
Hongze Cheng 已提交
1345

H
Hongze Cheng 已提交
1346
  if (pWriter->pSIter) {
H
Hongze Cheng 已提交
1347
    code = tsdbDataIterNext2(pWriter->pSIter, NULL);
H
Hongze Cheng 已提交
1348
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1349

H
Hongze Cheng 已提交
1350 1351
    if (pWriter->pSIter->rowInfo.suid == 0 && pWriter->pSIter->rowInfo.uid == 0) {
      pWriter->pSIter = NULL;
H
Hongze Cheng 已提交
1352 1353 1354
    } else {
      SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
      if (pNode) {
H
Hongze Cheng 已提交
1355
        int32_t c = tsdbDataIterCmprFn(&pWriter->pSIter->rbtn, pNode);
H
Hongze Cheng 已提交
1356
        if (c > 0) {
H
Hongze Cheng 已提交
1357 1358
          tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn);
          pWriter->pSIter = NULL;
H
Hongze Cheng 已提交
1359 1360 1361 1362 1363
        } else if (c == 0) {
          ASSERT(0);
        }
      }
    }
H
Hongze Cheng 已提交
1364 1365
  }

H
Hongze Cheng 已提交
1366
  if (pWriter->pSIter == NULL) {
H
Hongze Cheng 已提交
1367 1368 1369
    SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
    if (pNode) {
      tRBTreeDrop(&pWriter->rbt, pNode);
H
Hongze Cheng 已提交
1370
      pWriter->pSIter = TSDB_RBTN_TO_DATA_ITER(pNode);
H
Hongze Cheng 已提交
1371
    }
H
Hongze Cheng 已提交
1372 1373
  }

H
Hongze Cheng 已提交
1374
  if (ppRowInfo) {
H
Hongze Cheng 已提交
1375 1376
    if (pWriter->pSIter) {
      *ppRowInfo = &pWriter->pSIter->rowInfo;
H
Hongze Cheng 已提交
1377 1378 1379
    } else {
      *ppRowInfo = NULL;
    }
H
Hongze Cheng 已提交
1380
  }
H
Hongze Cheng 已提交
1381

H
Hongze Cheng 已提交
1382 1383 1384
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1385
  }
H
Hongze Cheng 已提交
1386 1387 1388 1389 1390 1391 1392
  return code;
}

static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1393 1394
  if (pWriter->pSIter) {
    *ppRowInfo = &pWriter->pSIter->rowInfo;
H
Hongze Cheng 已提交
1395 1396 1397 1398 1399
    goto _exit;
  }

  code = tsdbSnapWriteNextRow(pWriter, ppRowInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1400

H
Hongze Cheng 已提交
1401
_exit:
H
Hongze Cheng 已提交
1402 1403 1404
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1405
  return code;
H
Hongze Cheng 已提交
1406
}
H
Hongze Cheng 已提交
1407

H
Hongze Cheng 已提交
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
  int32_t lino = 0;

  ASSERT(pWriter->pDataFWriter);

  // consume remain data and end with a NULL table row
  SRowInfo* pRowInfo;
  code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
  for (;;) {
    code = tsdbSnapWriteTableData(pWriter, pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (pRowInfo == NULL) break;

    code = tsdbSnapWriteNextRow(pWriter, &pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // do file-level updates
  code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbUpdateDFileSetHeader(pWriter->pDataFWriter);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pWriter->pDataFReader) {
    code = tsdbDataFReaderClose(&pWriter->pDataFReader);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // clear sources
  while (pWriter->iterList) {
    STsdbDataIter2* pIter = pWriter->iterList;
    pWriter->iterList = pIter->next;
    tsdbCloseDataIter2(pIter);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->pTsdb->pVnode), __func__);
  }
  return code;
}

H
Hongze Cheng 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tDecmprBlockData(pHdr->data, pHdr->size, &pWriter->inData, pWriter->aBuf);
  TSDB_CHECK_CODE(code, lino, _exit);

  ASSERT(pWriter->inData.nRow > 0);

  // switch to new data file if need
  int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision);
  if (pWriter->fid != fid) {
    if (pWriter->pDataFWriter) {
H
Hongze Cheng 已提交
1478
      code = tsdbSnapWriteFileDataEnd(pWriter);
H
Hongze Cheng 已提交
1479 1480 1481
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
1482
    code = tsdbSnapWriteFileDataStart(pWriter, fid);
H
Hongze Cheng 已提交
1483 1484 1485 1486 1487 1488 1489
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // loop write each row
  SRowInfo* pRowInfo;
  code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1490
  for (int32_t iRow = 0; iRow < pWriter->inData.nRow; ++iRow) {
H
Hongze Cheng 已提交
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
    SRowInfo rInfo = {.suid = pWriter->inData.suid,
                      .uid = pWriter->inData.uid ? pWriter->inData.uid : pWriter->inData.aUid[iRow],
                      .row = tsdbRowFromBlockData(&pWriter->inData, iRow)};

    for (;;) {
      if (pRowInfo == NULL) {
        code = tsdbSnapWriteTableData(pWriter, &rInfo);
        TSDB_CHECK_CODE(code, lino, _exit);
        break;
      } else {
        int32_t c = tRowInfoCmprFn(&rInfo, pRowInfo);
        if (c < 0) {
          code = tsdbSnapWriteTableData(pWriter, &rInfo);
          TSDB_CHECK_CODE(code, lino, _exit);
          break;
        } else if (c > 0) {
          code = tsdbSnapWriteTableData(pWriter, pRowInfo);
          TSDB_CHECK_CODE(code, lino, _exit);

          code = tsdbSnapWriteNextRow(pWriter, &pRowInfo);
          TSDB_CHECK_CODE(code, lino, _exit);
        } else {
          ASSERT(0);
        }
      }
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->pTsdb->pVnode), __func__,
              pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow);
  }
H
Hongze Cheng 已提交
1526 1527 1528
  return code;
}

H
Hongze Cheng 已提交
1529
// SNAP_DATA_DEL
H
Hongze Cheng 已提交
1530
static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
H
Hongze Cheng 已提交
1531
  int32_t code = 0;
H
Hongze Cheng 已提交
1532
  int32_t lino = 0;
H
Hongze Cheng 已提交
1533

H
Hongze Cheng 已提交
1534 1535 1536 1537 1538
  if (pId) {
    pWriter->tbid = *pId;
  } else {
    pWriter->tbid = (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX};
  }
H
Hongze Cheng 已提交
1539

H
Hongze Cheng 已提交
1540
  taosArrayClear(pWriter->aDelData);
H
Hongze Cheng 已提交
1541

H
Hongze Cheng 已提交
1542 1543
  if (pWriter->pTIter) {
    while (pWriter->pTIter->tIter.iDelIdx < taosArrayGetSize(pWriter->pTIter->tIter.aDelIdx)) {
H
Hongze Cheng 已提交
1544
      SDelIdx* pDelIdx = taosArrayGet(pWriter->pTIter->tIter.aDelIdx, pWriter->pTIter->tIter.iDelIdx);
H
Hongze Cheng 已提交
1545

H
Hongze Cheng 已提交
1546 1547 1548 1549
      int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid);
      if (c < 0) {
        code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData);
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1550

H
Hongze Cheng 已提交
1551
        SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1);
H
Hongze Cheng 已提交
1552 1553 1554 1555 1556 1557 1558
        if (pDelIdxNew == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          TSDB_CHECK_CODE(code, lino, _exit);
        }

        pDelIdxNew->suid = pDelIdx->suid;
        pDelIdxNew->uid = pDelIdx->uid;
H
Hongze Cheng 已提交
1559

H
Hongze Cheng 已提交
1560 1561 1562 1563 1564
        code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->pTIter->tIter.aDelData, pDelIdxNew);
        TSDB_CHECK_CODE(code, lino, _exit);

        pWriter->pTIter->tIter.iDelIdx++;
      } else if (c == 0) {
H
Hongze Cheng 已提交
1565
        code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
H
Hongze Cheng 已提交
1566 1567 1568 1569 1570 1571 1572 1573
        TSDB_CHECK_CODE(code, lino, _exit);

        pWriter->pTIter->tIter.iDelIdx++;
        break;
      } else {
        break;
      }
    }
H
Hongze Cheng 已提交
1574 1575 1576
  }

_exit:
H
Hongze Cheng 已提交
1577 1578 1579 1580 1581 1582
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pId->suid,
              pId->uid);
  }
H
Hongze Cheng 已提交
1583 1584 1585
  return code;
}

H
Hongze Cheng 已提交
1586
static int32_t tsdbSnapWriteDelTableDataEnd(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
1587
  int32_t code = 0;
H
Hongze Cheng 已提交
1588
  int32_t lino = 0;
H
Hongze Cheng 已提交
1589

H
Hongze Cheng 已提交
1590 1591 1592 1593 1594 1595
  if (taosArrayGetSize(pWriter->aDelData) > 0) {
    SDelIdx* pDelIdx = taosArrayReserve(pWriter->aDelIdx, 1);
    if (pDelIdx == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
1596

H
Hongze Cheng 已提交
1597 1598
    pDelIdx->suid = pWriter->tbid.suid;
    pDelIdx->uid = pWriter->tbid.uid;
H
Hongze Cheng 已提交
1599

H
Hongze Cheng 已提交
1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620
    code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, pDelIdx);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbTrace("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
  }
  return code;
}

static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId, uint8_t* pData, int64_t size) {
  int32_t code = 0;
  int32_t lino = 0;

  if (pId == NULL || pId->uid != pWriter->tbid.uid) {
    if (pWriter->tbid.uid) {
      code = tsdbSnapWriteDelTableDataEnd(pWriter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1621 1622
    }

H
Hongze Cheng 已提交
1623 1624
    code = tsdbSnapWriteDelTableDataStart(pWriter, pId);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1625 1626
  }

H
Hongze Cheng 已提交
1627
  if (pId == NULL) goto _exit;
H
Hongze Cheng 已提交
1628

H
Hongze Cheng 已提交
1629 1630 1631 1632
  int64_t n = 0;
  while (n < size) {
    SDelData delData;
    n += tGetDelData(pData + n, &delData);
H
Hongze Cheng 已提交
1633

H
Hongze Cheng 已提交
1634 1635 1636 1637 1638 1639
    if (taosArrayPush(pWriter->aDelData, &delData) < 0) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }
  ASSERT(n == size);
H
Hongze Cheng 已提交
1640

H
Hongze Cheng 已提交
1641 1642 1643
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1644
  }
H
Hongze Cheng 已提交
1645 1646
  return code;
}
H
Hongze Cheng 已提交
1647

H
Hongze Cheng 已提交
1648 1649 1650
static int32_t tsdbSnapWriteDelDataStart(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1651

H
Hongze Cheng 已提交
1652 1653
  STsdb*    pTsdb = pWriter->pTsdb;
  SDelFile* pDelFile = pWriter->fs.pDelFile;
H
Hongze Cheng 已提交
1654

H
Hongze Cheng 已提交
1655 1656 1657 1658 1659 1660 1661 1662 1663
  pWriter->tbid = (TABLEID){0};

  // reader
  if (pDelFile) {
    code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbOpenTombFileDataIter(pWriter->pDelFReader, &pWriter->pTIter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1664
  }
H
Hongze Cheng 已提交
1665

H
Hongze Cheng 已提交
1666 1667 1668
  // writer
  code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &(SDelFile){.commitID = pWriter->commitID}, pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1669

H
Hongze Cheng 已提交
1670
  if ((pWriter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
1671
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1672 1673 1674 1675 1676
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  if ((pWriter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1677 1678
  }

H
Hongze Cheng 已提交
1679 1680 1681 1682 1683 1684
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1685 1686 1687
  return code;
}

H
Hongze Cheng 已提交
1688
static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
1689
  int32_t code = 0;
H
Hongze Cheng 已提交
1690
  int32_t lino = 0;
H
Hongze Cheng 已提交
1691

H
Hongze Cheng 已提交
1692
  STsdb* pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
1693

H
Hongze Cheng 已提交
1694
  // end remaining table with NULL data
H
Hongze Cheng 已提交
1695 1696
  code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1697

H
Hongze Cheng 已提交
1698 1699 1700
  // update file-level info
  code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdx);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1701

H
Hongze Cheng 已提交
1702
  code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
H
Hongze Cheng 已提交
1703
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1704

H
Hongze Cheng 已提交
1705
  code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
1706
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1707 1708

  code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
H
Hongze Cheng 已提交
1709
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1710 1711 1712

  if (pWriter->pDelFReader) {
    code = tsdbDelFReaderClose(&pWriter->pDelFReader);
H
Hongze Cheng 已提交
1713
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1714 1715
  }

H
Hongze Cheng 已提交
1716 1717 1718 1719 1720
  if (pWriter->pTIter) {
    tsdbCloseDataIter2(pWriter->pTIter);
    pWriter->pTIter = NULL;
  }

H
Hongze Cheng 已提交
1721 1722 1723 1724 1725 1726
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1727
  return code;
H
Hongze Cheng 已提交
1728
}
H
Hongze Cheng 已提交
1729

H
Hongze Cheng 已提交
1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741
static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdb* pTsdb = pWriter->pTsdb;

  // start to write del data if need
  if (pWriter->pDelFWriter == NULL) {
    code = tsdbSnapWriteDelDataStart(pWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
1742
  // do write del data
H
Hongze Cheng 已提交
1743 1744 1745 1746 1747 1748 1749 1750 1751 1752
  code = tsdbSnapWriteDelTableData(pWriter, (TABLEID*)pHdr->data, pHdr->data + sizeof(TABLEID),
                                   pHdr->size - sizeof(TABLEID));
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed since %s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code));
  } else {
    tsdbTrace("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1753 1754 1755
  return code;
}

H
Hongze Cheng 已提交
1756
// APIs
H
Hongze Cheng 已提交
1757
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
H
Hongze Cheng 已提交
1758 1759
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1760 1761

  // alloc
H
Hongze Cheng 已提交
1762
  STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
H
Hongze Cheng 已提交
1763 1764
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1765
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1766 1767 1768 1769
  }
  pWriter->pTsdb = pTsdb;
  pWriter->sver = sver;
  pWriter->ever = ever;
H
Hongze Cheng 已提交
1770 1771 1772 1773 1774
  pWriter->minutes = pTsdb->keepCfg.days;
  pWriter->precision = pTsdb->keepCfg.precision;
  pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
1775 1776
  pWriter->commitID = pTsdb->pVnode->state.commitID;

H
Hongze Cheng 已提交
1777 1778 1779
  code = tsdbFSCopy(pTsdb, &pWriter->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
1780
  // SNAP_DATA_TSDB
H
Hongze Cheng 已提交
1781 1782 1783
  code = tBlockDataCreate(&pWriter->inData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
1784
  pWriter->fid = INT32_MIN;
H
Hongze Cheng 已提交
1785

H
Hongze Cheng 已提交
1786
  code = tBlockDataCreate(&pWriter->bData);
H
Hongze Cheng 已提交
1787
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1788 1789

  code = tBlockDataCreate(&pWriter->sData);
H
Hongze Cheng 已提交
1790
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1791

H
Hongze Cheng 已提交
1792
  // SNAP_DATA_DEL
H
Hongze Cheng 已提交
1793

H
Hongze Cheng 已提交
1794 1795
_exit:
  if (code) {
H
Hongze Cheng 已提交
1796
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1797
    if (pWriter) {
H
Hongze Cheng 已提交
1798
      tBlockDataDestroy(&pWriter->sData, 1);
H
Hongze Cheng 已提交
1799
      tBlockDataDestroy(&pWriter->bData, 1);
H
Hongze Cheng 已提交
1800
      tBlockDataDestroy(&pWriter->inData, 1);
H
Hongze Cheng 已提交
1801
      tsdbFSDestroy(&pWriter->fs);
H
Hongze Cheng 已提交
1802
      pWriter = NULL;
H
Hongze Cheng 已提交
1803 1804
    }
  } else {
H
Hongze Cheng 已提交
1805
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
H
Hongze Cheng 已提交
1806
  }
H
Hongze Cheng 已提交
1807
  *ppWriter = pWriter;
H
Hongze Cheng 已提交
1808 1809 1810
  return code;
}

H
Hongze Cheng 已提交
1811 1812
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1813 1814
  int32_t lino = 0;

H
Hongze Cheng 已提交
1815
  if (pWriter->pDataFWriter) {
H
Hongze Cheng 已提交
1816
    code = tsdbSnapWriteFileDataEnd(pWriter);
H
Hongze Cheng 已提交
1817
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1818 1819
  }

H
Hongze Cheng 已提交
1820 1821 1822 1823
  if (pWriter->pDelFWriter) {
    code = tsdbSnapWriteDelDataEnd(pWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
1824 1825

  code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
H
Hongze Cheng 已提交
1826
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1827 1828 1829

_exit:
  if (code) {
H
Hongze Cheng 已提交
1830 1831 1832
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
1833 1834 1835 1836
  }
  return code;
}

H
Hongze Cheng 已提交
1837
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
H
Hongze Cheng 已提交
1838
  int32_t          code = 0;
H
Hongze Cheng 已提交
1839
  STsdbSnapWriter* pWriter = *ppWriter;
H
Hongze Cheng 已提交
1840
  STsdb*           pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
1841 1842

  if (rollback) {
H
Hongze Cheng 已提交
1843
    tsdbRollbackCommit(pWriter->pTsdb);
H
Hongze Cheng 已提交
1844
  } else {
H
Hongze Cheng 已提交
1845 1846 1847
    // lock
    taosThreadRwlockWrlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
1848
    code = tsdbFSCommit(pWriter->pTsdb);
H
Hongze Cheng 已提交
1849 1850 1851 1852 1853 1854 1855
    if (code) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      goto _err;
    }

    // unlock
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1856 1857
  }

H
Hongze Cheng 已提交
1858 1859
  // SNAP_DATA_DEL
  taosArrayDestroy(pWriter->aDelData);
H
Hongze Cheng 已提交
1860
  taosArrayDestroy(pWriter->aDelIdx);
H
Hongze Cheng 已提交
1861 1862

  // SNAP_DATA_TSDB
H
Hongze Cheng 已提交
1863
  tBlockDataDestroy(&pWriter->sData, 1);
H
Hongze Cheng 已提交
1864
  tBlockDataDestroy(&pWriter->bData, 1);
H
Hongze Cheng 已提交
1865 1866 1867
  taosArrayDestroy(pWriter->aSttBlk);
  tMapDataClear(&pWriter->mDataBlk);
  taosArrayDestroy(pWriter->aBlockIdx);
1868
  tDestroyTSchema(pWriter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1869
  tBlockDataDestroy(&pWriter->inData, 1);
H
Hongze Cheng 已提交
1870

H
Hongze Cheng 已提交
1871 1872 1873
  for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
    tFree(pWriter->aBuf[iBuf]);
  }
S
Shengliang Guan 已提交
1874
  tsdbInfo("vgId:%d, %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
1875 1876 1877 1878 1879
  taosMemoryFree(pWriter);
  *ppWriter = NULL;
  return code;

_err:
S
Shengliang Guan 已提交
1880
  tsdbError("vgId:%d, vnode snapshot tsdb writer close for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
1881 1882 1883
            pWriter->pTsdb->path, tstrerror(code));
  taosMemoryFree(pWriter);
  *ppWriter = NULL;
H
Hongze Cheng 已提交
1884 1885 1886
  return code;
}

H
Hongze Cheng 已提交
1887 1888 1889
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1890

C
Cary Xu 已提交
1891
  if (pHdr->type == SNAP_DATA_TSDB) {
H
Hongze Cheng 已提交
1892 1893
    code = tsdbSnapWriteTimeSeriesData(pWriter, pHdr);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1894
    goto _exit;
H
Hongze Cheng 已提交
1895
  } else if (pWriter->pDataFWriter) {
H
Hongze Cheng 已提交
1896
    code = tsdbSnapWriteFileDataEnd(pWriter);
H
Hongze Cheng 已提交
1897
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1898 1899
  }

C
Cary Xu 已提交
1900
  if (pHdr->type == SNAP_DATA_DEL) {
H
Hongze Cheng 已提交
1901 1902 1903
    code = tsdbSnapWriteDelData(pWriter, pHdr);
    TSDB_CHECK_CODE(code, lino, _exit);
    goto _exit;
H
Hongze Cheng 已提交
1904 1905
  }

H
Hongze Cheng 已提交
1906
_exit:
H
Hongze Cheng 已提交
1907 1908 1909 1910 1911 1912 1913
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
              TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code), pHdr->type, pHdr->index, pHdr->size);
  } else {
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
              pHdr->type, pHdr->index, pHdr->size);
  }
H
Hongze Cheng 已提交
1914 1915
  return code;
}