tsdbSnapshot.c 59.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);

// STsdbDataIter2 ========================================
#define TSDB_MEM_TABLE_DATA_ITER 0
#define TSDB_DATA_FILE_DATA_ITER 1
#define TSDB_STT_FILE_DATA_ITER  2

typedef struct STsdbDataIter2 STsdbDataIter2;
struct STsdbDataIter2 {
  STsdbDataIter2* next;
  SRBTreeNode     rbtn;

  int32_t  type;
  SRowInfo rowInfo;
  union {
    // TSDB_MEM_TABLE_DATA_ITER
    struct {
      SMemTable* pMemTable;
    } mIter;

    // TSDB_DATA_FILE_DATA_ITER
    struct {
      SDataFReader* pReader;
      SArray*       aBlockIdx;  // SArray<SBlockIdx>
      SMapData      mDataBlk;
      SBlockData    bData;
      int32_t       iBlockIdx;
      int32_t       iDataBlk;
      int32_t       iRow;

    } dIter;

    // TSDB_STT_FILE_DATA_ITER
    struct {
      SDataFReader* pReader;
      int32_t       iStt;
      SArray*       aSttBlk;
      SBlockData    bData;
      int32_t       iSttBlk;
      int32_t       iRow;
    } sIter;
  };
};

#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2*)(((char*)pNode) - offsetof(STsdbDataIter2, rbtn)))

/* open */
static int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  // create handle
  STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pIter->type = TSDB_DATA_FILE_DATA_ITER;
  pIter->dIter.pReader = pReader;
  if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  code = tBlockDataCreate(&pIter->dIter.bData);
  TSDB_CHECK_CODE(code, lino, _exit);

  pIter->dIter.iBlockIdx = -1;
  pIter->dIter.iDataBlk = -1;
  pIter->dIter.iRow = -1;

  // read data
  code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear;

_exit:
  if (code) {
    if (pIter) {
    _clear:
      tBlockDataDestroy(&pIter->dIter.bData, 1);
      taosArrayDestroy(pIter->dIter.aBlockIdx);
      taosMemoryFree(pIter);
      pIter = NULL;
    }
  }
  *ppIter = pIter;
  return code;
}

static int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  // create handle
  STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pIter->type = TSDB_STT_FILE_DATA_ITER;
  pIter->sIter.pReader = pReader;
  pIter->sIter.iStt = iStt;
  pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pIter->sIter.aSttBlk == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  code = tBlockDataCreate(&pIter->sIter.bData);
  TSDB_CHECK_CODE(code, lino, _exit);

  pIter->sIter.iSttBlk = -1;
  pIter->sIter.iRow = -1;

  // read data
  code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear;

_exit:
  if (code) {
    if (pIter) {
    _clear:
      taosArrayDestroy(pIter->sIter.aSttBlk);
      tBlockDataDestroy(&pIter->sIter.bData, 1);
      taosMemoryFree(pIter);
      pIter = NULL;
    }
  }
  *ppIter = pIter;
  return code;
}

/* close */
static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) {
  tBlockDataDestroy(&pIter->dIter.bData, 1);
  tMapDataClear(&pIter->dIter.mDataBlk);
  taosArrayDestroy(pIter->dIter.aBlockIdx);
  taosMemoryFree(pIter);
}

static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) {
  tBlockDataDestroy(&pIter->sIter.bData, 1);
  taosArrayDestroy(pIter->sIter.aSttBlk);
  taosMemoryFree(pIter);
}

static void tsdbCloseDataIter2(STsdbDataIter2* pIter) {
  if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
    ASSERT(0);
  } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
    tsdbCloseDataFileDataIter(pIter);
  } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
    tsdbCloseSttFileDataIter(pIter);
  } else {
    ASSERT(0);
  }
}

/* cmpr */
static int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
  STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1);
  STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2);
  return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}

/* seek */

/* iter next */
static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter) {
  int32_t code = 0;
  int32_t lino = 0;

  for (;;) {
    if (++pIter->dIter.iRow < pIter->dIter.bData.nRow) {
      pIter->rowInfo.suid = pIter->dIter.bData.suid;
      pIter->rowInfo.uid = pIter->dIter.bData.uid;
      pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
      break;
    }

    for (;;) {
      if (++pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
        SDataBlk dataBlk;
        tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);

        code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData);
        TSDB_CHECK_CODE(code, lino, _exit);

        pIter->dIter.iRow = -1;

        break;
      }

      for (;;) {
        if (++pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
          SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);

          code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
          TSDB_CHECK_CODE(code, lino, _exit);

          pIter->dIter.iDataBlk = -1;

          break;
        } else {
          pIter->rowInfo = (SRowInfo){0};
          goto _exit;
        }
      }
    }
  }

_exit:
  if (code) {
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
  }
  return code;
}

static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter) {
  int32_t code = 0;
  int32_t lino = 0;

  for (;;) {
    if (++pIter->sIter.iRow < pIter->sIter.bData.nRow) {
      pIter->rowInfo.suid = pIter->sIter.bData.suid;
      pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
      pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow);
      break;
    }

    if (++pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
      SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);

      code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
      TSDB_CHECK_CODE(code, lino, _exit);

      pIter->sIter.iRow = -1;
    } else {
      pIter->rowInfo = (SRowInfo){0};
      break;
    }
  }

_exit:
  if (code) {
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
  }
  return code;
}

static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter) {
  int32_t code = 0;

  if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
    ASSERT(0);
    return code;
  } else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
    return tsdbDataFileDataIterNext(pIter);
  } else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
    return tsdbSttFileDataIterNext(pIter);
  } else {
    ASSERT(0);
    return code;
  }
}

/* get */

H
Hongze Cheng 已提交
292
// STsdbSnapReader ========================================
H
Hongze Cheng 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT;
typedef struct {
  SRBTreeNode n;
  SRowInfo    rInfo;
  EFIterT     type;
  union {
    struct {
      SArray*    aBlockIdx;
      int32_t    iBlockIdx;
      SBlockIdx* pBlockIdx;
      SMapData   mBlock;
      int32_t    iBlock;
    };  // .data file
    struct {
      int32_t iStt;
      SArray* aSttBlk;
      int32_t iSttBlk;
    };  // .stt file
  };
  SBlockData bData;
  int32_t    iRow;
} SFDataIter;

H
Hongze Cheng 已提交
316
struct STsdbSnapReader {
H
more  
Hongze Cheng 已提交
317 318 319
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
320
  STsdbFS fs;
C
Cary Xu 已提交
321
  int8_t  type;
H
more  
Hongze Cheng 已提交
322
  // for data file
H
Hongze Cheng 已提交
323 324
  int8_t        dataDone;
  int32_t       fid;
H
more  
Hongze Cheng 已提交
325
  SDataFReader* pDataFReader;
H
Hongze Cheng 已提交
326 327
  SFDataIter*   pIter;
  SRBTree       rbt;
H
Hongze Cheng 已提交
328
  SFDataIter    aFDataIter[TSDB_MAX_STT_TRIGGER + 1];
H
Hongze Cheng 已提交
329 330
  SBlockData    bData;
  SSkmInfo      skmTable;
H
more  
Hongze Cheng 已提交
331
  // for del file
H
Hongze Cheng 已提交
332
  int8_t       delDone;
H
more  
Hongze Cheng 已提交
333
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
334
  SArray*      aDelIdx;  // SArray<SDelIdx>
H
Hongze Cheng 已提交
335 336
  int32_t      iDelIdx;
  SArray*      aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
337
  uint8_t*     aBuf[5];
H
Hongze Cheng 已提交
338 339
};

H
Hongze Cheng 已提交
340 341
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);

H
Hongze Cheng 已提交
342 343 344 345 346 347 348
static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
  SFDataIter* pIter1 = (SFDataIter*)(((uint8_t*)pNode1) - offsetof(SFDataIter, n));
  SFDataIter* pIter2 = (SFDataIter*)(((uint8_t*)pNode2) - offsetof(SFDataIter, n));

  return tRowInfoCmprFn(&pIter1->rInfo, &pIter2->rInfo);
}

H
Hongze Cheng 已提交
349
static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
H
Hongze Cheng 已提交
350
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
351
  int32_t lino = 0;
H
Hongze Cheng 已提交
352

H
Hongze Cheng 已提交
353 354 355
  SDFileSet  dFileSet = {.fid = pReader->fid};
  SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
  if (pSet == NULL) return code;
H
Hongze Cheng 已提交
356

H
Hongze Cheng 已提交
357 358
  pReader->fid = pSet->fid;
  code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
H
add log  
Hongze Cheng 已提交
359
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
360 361

  pReader->pIter = NULL;
H
Hongze Cheng 已提交
362
  tRBTreeCreate(&pReader->rbt, tFDataIterCmprFn);
H
Hongze Cheng 已提交
363 364 365 366 367 368

  // .data file
  SFDataIter* pIter = &pReader->aFDataIter[0];
  pIter->type = SNAP_DATA_FILE_ITER;

  code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx);
H
add log  
Hongze Cheng 已提交
369
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
370 371 372 373

  for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) {
    pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);

H
Hongze Cheng 已提交
374
    code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
H
add log  
Hongze Cheng 已提交
375
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
376 377 378 379 380 381

    for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
      SDataBlk dataBlk;
      tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);

      if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
H
Hongze Cheng 已提交
382

H
Hongze Cheng 已提交
383
      code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
H
add log  
Hongze Cheng 已提交
384
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
385

386 387
      ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid);
      ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid);
H
Hongze Cheng 已提交
388

H
Hongze Cheng 已提交
389 390
      for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
        int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
H
Hongze Cheng 已提交
391

H
Hongze Cheng 已提交
392 393 394 395 396 397
        if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
          pIter->rInfo.suid = pIter->pBlockIdx->suid;
          pIter->rInfo.uid = pIter->pBlockIdx->uid;
          pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
          goto _add_iter_and_break;
        }
H
Hongze Cheng 已提交
398
      }
H
Hongze Cheng 已提交
399
    }
H
Hongze Cheng 已提交
400

H
Hongze Cheng 已提交
401
    continue;
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403 404 405 406
  _add_iter_and_break:
    tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
    break;
  }
H
Hongze Cheng 已提交
407

H
Hongze Cheng 已提交
408 409 410 411 412
  // .stt file
  pIter = &pReader->aFDataIter[1];
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
    pIter->type = SNAP_STT_FILE_ITER;
    pIter->iStt = iStt;
H
Hongze Cheng 已提交
413

H
Hongze Cheng 已提交
414
    code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk);
H
add log  
Hongze Cheng 已提交
415
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
416 417 418 419 420 421 422

    for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
      SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);

      if (pSttBlk->minVer > pReader->ever) continue;
      if (pSttBlk->maxVer < pReader->sver) continue;

H
Hongze Cheng 已提交
423
      code = tsdbReadSttBlockEx(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
H
add log  
Hongze Cheng 已提交
424
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
425 426 427 428 429 430

      for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
        int64_t rowVer = pIter->bData.aVersion[pIter->iRow];

        if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
          pIter->rInfo.suid = pIter->bData.suid;
H
Hongze Cheng 已提交
431
          pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
H
Hongze Cheng 已提交
432 433 434 435
          pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
          goto _add_iter;
        }
      }
H
Hongze Cheng 已提交
436
    }
H
Hongze Cheng 已提交
437

H
Hongze Cheng 已提交
438
    continue;
H
Hongze Cheng 已提交
439

H
Hongze Cheng 已提交
440 441 442 443
  _add_iter:
    tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
    pIter++;
  }
H
Hongze Cheng 已提交
444

H
add log  
Hongze Cheng 已提交
445 446 447 448 449 450 451
_exit:
  if (code) {
    tsdbError("vgId:%d, %s failed since %s", TD_VID(pReader->pTsdb->pVnode), __func__, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d, %s done, path:%s, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->pTsdb->path,
             pReader->fid);
  }
H
Hongze Cheng 已提交
452 453 454 455 456 457 458
  return code;
}

static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
  int32_t code = 0;

  if (pReader->pIter) {
H
Hongze Cheng 已提交
459
    SFDataIter* pIter = NULL;
H
Hongze Cheng 已提交
460 461
    while (true) {
    _find_row:
H
Hongze Cheng 已提交
462
      pIter = pReader->pIter;
H
Hongze Cheng 已提交
463 464 465 466
      for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
        int64_t rowVer = pIter->bData.aVersion[pIter->iRow];

        if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
H
Hongze Cheng 已提交
467
          pIter->rInfo.suid = pIter->bData.suid;
H
Hongze Cheng 已提交
468 469 470
          pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
          pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
          goto _out;
H
Hongze Cheng 已提交
471
        }
H
Hongze Cheng 已提交
472
      }
H
Hongze Cheng 已提交
473

H
Hongze Cheng 已提交
474 475 476 477 478
      if (pIter->type == SNAP_DATA_FILE_ITER) {
        while (true) {
          for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
            SDataBlk dataBlk;
            tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
479

H
Hongze Cheng 已提交
480 481 482
            if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;

            code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
H
Hongze Cheng 已提交
483 484
            if (code) goto _err;

H
Hongze Cheng 已提交
485 486
            pIter->iRow = -1;
            goto _find_row;
H
Hongze Cheng 已提交
487
          }
H
Hongze Cheng 已提交
488 489 490 491 492

          pIter->iBlockIdx++;
          if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break;

          pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
H
Hongze Cheng 已提交
493
          code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
H
Hongze Cheng 已提交
494 495
          if (code) goto _err;
          pIter->iBlock = -1;
H
Hongze Cheng 已提交
496
        }
H
Hongze Cheng 已提交
497

H
Hongze Cheng 已提交
498
        pReader->pIter = NULL;
H
Hongze Cheng 已提交
499
        break;
H
Hongze Cheng 已提交
500 501 502
      } else if (pIter->type == SNAP_STT_FILE_ITER) {
        for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
          SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
503

H
Hongze Cheng 已提交
504
          if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue;
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506
          code = tsdbReadSttBlockEx(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
507 508 509 510
          if (code) goto _err;

          pIter->iRow = -1;
          goto _find_row;
H
Hongze Cheng 已提交
511
        }
H
Hongze Cheng 已提交
512 513

        pReader->pIter = NULL;
H
Hongze Cheng 已提交
514
        break;
H
Hongze Cheng 已提交
515
      } else {
516
        ASSERT(0);
H
Hongze Cheng 已提交
517 518 519 520 521 522 523 524 525 526 527
      }
    }

  _out:
    pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
    if (pReader->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo);
      if (c > 0) {
        tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
        pReader->pIter = NULL;
      } else {
528
        ASSERT(c);
H
Hongze Cheng 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
      }
    }
  }

  if (pReader->pIter == NULL) {
    pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
    if (pReader->pIter) {
      tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
    }
  }

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
546 547 548 549 550 551 552 553 554 555 556 557 558 559
static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) {
  if (pReader->pIter) {
    return &pReader->pIter->rInfo;
  } else {
    tsdbSnapNextRow(pReader);

    if (pReader->pIter) {
      return &pReader->pIter->rInfo;
    } else {
      return NULL;
    }
  }
}

H
Hongze Cheng 已提交
560 561 562
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;

563
  ASSERT(pReader->bData.nRow);
H
Hongze Cheng 已提交
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594

  int32_t aBufN[5] = {0};
  code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN);
  if (code) goto _exit;

  int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
  if (*ppData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData;
  pHdr->type = SNAP_DATA_TSDB;
  pHdr->size = size;

  memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]);
  memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]);
  if (aBufN[1]) {
    memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]);
  }
  if (aBufN[0]) {
    memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]);
  }

_exit:
  return code;
}

static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
595 596 597
  int32_t lino = 0;

  STsdb* pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
598 599 600 601

  while (true) {
    if (pReader->pDataFReader == NULL) {
      code = tsdbSnapReadOpenFile(pReader);
H
add log  
Hongze Cheng 已提交
602
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
603 604 605 606 607 608 609 610 611 612 613 614 615 616
    }

    if (pReader->pDataFReader == NULL) break;

    SRowInfo* pRowInfo = tsdbSnapGetRow(pReader);
    if (pRowInfo == NULL) {
      tsdbDataFReaderClose(&pReader->pDataFReader);
      continue;
    }

    TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
    SBlockData* pBlockData = &pReader->bData;

    code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
H
add log  
Hongze Cheng 已提交
617
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
618

H
Hongze Cheng 已提交
619
    code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0);
H
add log  
Hongze Cheng 已提交
620
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
621 622 623

    while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
      code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid);
H
add log  
Hongze Cheng 已提交
624
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
625 626

      code = tsdbSnapNextRow(pReader);
H
add log  
Hongze Cheng 已提交
627
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
628 629 630

      pRowInfo = tsdbSnapGetRow(pReader);
      if (pRowInfo == NULL) {
H
Hongze Cheng 已提交
631 632
        tsdbDataFReaderClose(&pReader->pDataFReader);
        break;
H
Hongze Cheng 已提交
633
      }
H
Hongze Cheng 已提交
634 635

      if (pBlockData->nRow >= 4096) break;
H
Hongze Cheng 已提交
636
    }
H
Hongze Cheng 已提交
637 638

    code = tsdbSnapCmprData(pReader, ppData);
H
add log  
Hongze Cheng 已提交
639
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
640 641

    break;
H
Hongze Cheng 已提交
642 643
  }

H
add log  
Hongze Cheng 已提交
644 645 646 647
_exit:
  if (code) {
    tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code), pTsdb->path);
  }
H
Hongze Cheng 已提交
648 649 650 651
  return code;
}

static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
H
add log  
Hongze Cheng 已提交
652 653 654
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
655
  STsdb*    pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
656
  SDelFile* pDelFile = pReader->fs.pDelFile;
H
Hongze Cheng 已提交
657 658 659 660 661 662 663

  if (pReader->pDelFReader == NULL) {
    if (pDelFile == NULL) {
      goto _exit;
    }

    // open
H
Hongze Cheng 已提交
664
    code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb);
H
add log  
Hongze Cheng 已提交
665
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
666

H
Hongze Cheng 已提交
667
    // read index
H
Hongze Cheng 已提交
668
    code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx);
H
add log  
Hongze Cheng 已提交
669
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
670 671 672 673

    pReader->iDelIdx = 0;
  }

H
Hongze Cheng 已提交
674 675 676 677 678 679
  while (true) {
    if (pReader->iDelIdx >= taosArrayGetSize(pReader->aDelIdx)) {
      tsdbDelFReaderClose(&pReader->pDelFReader);
      break;
    }

H
Hongze Cheng 已提交
680 681
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);

H
Hongze Cheng 已提交
682 683
    pReader->iDelIdx++;

H
Hongze Cheng 已提交
684
    code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData);
H
add log  
Hongze Cheng 已提交
685
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
686

H
Hongze Cheng 已提交
687
    int32_t size = 0;
H
Hongze Cheng 已提交
688 689 690 691
    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

      if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
H
Hongze Cheng 已提交
692
        size += tPutDelData(NULL, pDelData);
H
Hongze Cheng 已提交
693 694
      }
    }
H
Hongze Cheng 已提交
695
    if (size == 0) continue;
H
Hongze Cheng 已提交
696

H
Hongze Cheng 已提交
697 698 699 700 701
    // org data
    size = sizeof(TABLEID) + size;
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
    if (*ppData == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
add log  
Hongze Cheng 已提交
702
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
703
    }
H
Hongze Cheng 已提交
704

H
Hongze Cheng 已提交
705
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
C
Cary Xu 已提交
706
    pHdr->type = SNAP_DATA_DEL;
H
Hongze Cheng 已提交
707
    pHdr->size = size;
H
Hongze Cheng 已提交
708

H
Hongze Cheng 已提交
709 710 711 712 713 714
    TABLEID* pId = (TABLEID*)(&pHdr[1]);
    pId->suid = pDelIdx->suid;
    pId->uid = pDelIdx->uid;
    int32_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
H
Hongze Cheng 已提交
715

H
Hongze Cheng 已提交
716 717
      if (pDelData->version < pReader->sver) continue;
      if (pDelData->version > pReader->ever) continue;
H
Hongze Cheng 已提交
718

H
Hongze Cheng 已提交
719
      n += tPutDelData((*ppData) + n, pDelData);
H
Hongze Cheng 已提交
720 721
    }

H
Hongze Cheng 已提交
722
    tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%" PRId64 " size:%d",
C
Cary Xu 已提交
723
             TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size);
H
Hongze Cheng 已提交
724 725 726

    break;
  }
H
Hongze Cheng 已提交
727 728

_exit:
H
add log  
Hongze Cheng 已提交
729 730 731
  if (code) {
    tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code), pTsdb->path);
  }
H
Hongze Cheng 已提交
732 733
  return code;
}
H
more  
Hongze Cheng 已提交
734

C
Cary Xu 已提交
735
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
736
  int32_t          code = 0;
H
Hongze Cheng 已提交
737
  int32_t          lino = 0;
H
Hongze Cheng 已提交
738
  STsdbSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
739

H
more  
Hongze Cheng 已提交
740
  // alloc
H
Hongze Cheng 已提交
741
  pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
more  
Hongze Cheng 已提交
742 743
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
744
    TSDB_CHECK_CODE(code, lino, _exit);
H
more  
Hongze Cheng 已提交
745 746 747 748
  }
  pReader->pTsdb = pTsdb;
  pReader->sver = sver;
  pReader->ever = ever;
C
Cary Xu 已提交
749
  pReader->type = type;
H
more  
Hongze Cheng 已提交
750

H
Hongze Cheng 已提交
751 752 753
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
754
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
755 756 757 758 759
  }

  code = tsdbFSRef(pTsdb, &pReader->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
760
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
761 762 763 764 765
  }

  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
766
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
767 768
  }

H
Hongze Cheng 已提交
769
  // data
H
Hongze Cheng 已提交
770
  pReader->fid = INT32_MIN;
H
Hongze Cheng 已提交
771 772 773 774 775 776 777
  for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
    SFDataIter* pIter = &pReader->aFDataIter[iIter];

    if (iIter == 0) {
      pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
      if (pIter->aBlockIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
778
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
779 780 781 782 783
      }
    } else {
      pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
      if (pIter->aSttBlk == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
784
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
785 786 787 788
      }
    }

    code = tBlockDataCreate(&pIter->bData);
H
Hongze Cheng 已提交
789
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
790
  }
H
Hongze Cheng 已提交
791 792

  code = tBlockDataCreate(&pReader->bData);
H
Hongze Cheng 已提交
793
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
794

H
Hongze Cheng 已提交
795
  // del
H
Hongze Cheng 已提交
796 797 798
  pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pReader->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
799
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
800 801 802 803
  }
  pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pReader->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
804
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
805 806
  }

H
Hongze Cheng 已提交
807 808
_exit:
  if (code) {
S
Shengliang Guan 已提交
809
    tsdbError("vgId:%d, %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
810
              tstrerror(code), pTsdb->path);
H
Hongze Cheng 已提交
811 812 813 814 815 816 817 818 819 820 821 822 823
    *ppReader = NULL;

    if (pReader) {
      taosArrayDestroy(pReader->aDelData);
      taosArrayDestroy(pReader->aDelIdx);
      tBlockDataDestroy(&pReader->bData, 1);
      tsdbFSDestroy(&pReader->fs);
      taosMemoryFree(pReader);
    }
  } else {
    *ppReader = pReader;
    tsdbInfo("vgId:%d, vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
  }
H
more  
Hongze Cheng 已提交
824
  return code;
H
Hongze Cheng 已提交
825 826
}

H
Hongze Cheng 已提交
827
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
828
  int32_t          code = 0;
H
Hongze Cheng 已提交
829 830
  STsdbSnapReader* pReader = *ppReader;

H
Hongze Cheng 已提交
831 832 833 834 835 836 837 838 839 840 841 842 843
  // data
  if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader);
  for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
    SFDataIter* pIter = &pReader->aFDataIter[iIter];

    if (iIter == 0) {
      taosArrayDestroy(pIter->aBlockIdx);
      tMapDataClear(&pIter->mBlock);
    } else {
      taosArrayDestroy(pIter->aSttBlk);
    }

    tBlockDataDestroy(&pIter->bData, 1);
H
Hongze Cheng 已提交
844
  }
H
Hongze Cheng 已提交
845 846

  tBlockDataDestroy(&pReader->bData, 1);
847
  tDestroyTSchema(pReader->skmTable.pTSchema);
H
Hongze Cheng 已提交
848 849 850

  // del
  if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
H
Hongze Cheng 已提交
851 852 853
  taosArrayDestroy(pReader->aDelIdx);
  taosArrayDestroy(pReader->aDelData);

H
Hongze Cheng 已提交
854 855
  tsdbFSUnref(pReader->pTsdb, &pReader->fs);

S
Shengliang Guan 已提交
856
  tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
H
Hongze Cheng 已提交
857

H
Hongze Cheng 已提交
858 859 860 861
  for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
    tFree(pReader->aBuf[iBuf]);
  }

H
Hongze Cheng 已提交
862 863
  taosMemoryFree(pReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
864 865 866 867
  return code;
}

int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
H
more  
Hongze Cheng 已提交
868
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
869
  int32_t lino = 0;
H
Hongze Cheng 已提交
870

H
Hongze Cheng 已提交
871 872
  *ppData = NULL;

H
Hongze Cheng 已提交
873
  // read data file
H
Hongze Cheng 已提交
874 875
  if (!pReader->dataDone) {
    code = tsdbSnapReadData(pReader, ppData);
H
add log  
Hongze Cheng 已提交
876 877 878
    TSDB_CHECK_CODE(code, lino, _exit);
    if (*ppData) {
      goto _exit;
H
Hongze Cheng 已提交
879
    } else {
H
add log  
Hongze Cheng 已提交
880
      pReader->dataDone = 1;
H
Hongze Cheng 已提交
881 882
    }
  }
H
Hongze Cheng 已提交
883 884

  // read del file
H
Hongze Cheng 已提交
885 886
  if (!pReader->delDone) {
    code = tsdbSnapReadDel(pReader, ppData);
H
add log  
Hongze Cheng 已提交
887 888 889
    TSDB_CHECK_CODE(code, lino, _exit);
    if (*ppData) {
      goto _exit;
H
Hongze Cheng 已提交
890
    } else {
H
add log  
Hongze Cheng 已提交
891
      pReader->delDone = 1;
H
Hongze Cheng 已提交
892 893
    }
  }
H
Hongze Cheng 已提交
894

H
Hongze Cheng 已提交
895
_exit:
H
add log  
Hongze Cheng 已提交
896 897 898 899 900 901
  if (code) {
    tsdbError("vgId:%d, %s failed since %s, path:%s", TD_VID(pReader->pTsdb->pVnode), __func__, tstrerror(code),
              pReader->pTsdb->path);
  } else {
    tsdbDebug("vgId:%d, %s done, path:%s", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->pTsdb->path);
  }
H
more  
Hongze Cheng 已提交
902 903 904
  return code;
}

H
Hongze Cheng 已提交
905
// STsdbSnapWriter ========================================
H
Hongze Cheng 已提交
906
struct STsdbSnapWriter {
H
Hongze Cheng 已提交
907 908 909
  STsdb*   pTsdb;
  int64_t  sver;
  int64_t  ever;
H
Hongze Cheng 已提交
910 911 912 913 914 915
  int32_t  minutes;
  int8_t   precision;
  int32_t  minRow;
  int32_t  maxRow;
  int8_t   cmprAlg;
  int64_t  commitID;
H
Hongze Cheng 已提交
916
  uint8_t* aBuf[5];
H
Hongze Cheng 已提交
917

H
Hongze Cheng 已提交
918
  STsdbFS fs;
H
Hongze Cheng 已提交
919

H
Hongze Cheng 已提交
920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
  // time-series data
  SBlockData inData;

  int32_t  fid;
  TABLEID  tbid;
  SSkmInfo skmTable;

  /* reader */
  SDataFReader*   pDataFReader;
  STsdbDataIter2* iterList;
  STsdbDataIter2* pDIter;
  STsdbDataIter2* pIter;
  SRBTree         rbt;  // SRBTree<STsdbDataIter2>

  /* writer */
  SDataFWriter* pDataFWriter;
  SArray*       aBlockIdx;
  SMapData      mDataBlk;  // SMapData<SDataBlk>
  SArray*       aSttBlk;   // SArray<SSttBlk>
  SBlockData    bData;
  SBlockData    sData;

  // tombstone data
H
Hongze Cheng 已提交
943
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
944
  SDelFWriter* pDelFWriter;
H
Hongze Cheng 已提交
945
  int32_t      iDelIdx;
H
Hongze Cheng 已提交
946
  SArray*      aDelIdxR;
H
Hongze Cheng 已提交
947
  SArray*      aDelData;
H
Hongze Cheng 已提交
948
  SArray*      aDelIdxW;
H
Hongze Cheng 已提交
949 950
};

H
Hongze Cheng 已提交
951
// SNAP_DATA_TSDB
H
Hongze Cheng 已提交
952 953 954
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);

H
Hongze Cheng 已提交
955
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
956
  int32_t code = 0;
H
Hongze Cheng 已提交
957
  int32_t lino = 0;
H
Hongze Cheng 已提交
958

H
Hongze Cheng 已提交
959
#if 0
960
  ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow);
H
Hongze Cheng 已提交
961

H
Hongze Cheng 已提交
962 963 964 965 966 967
  if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
    pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);

    code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
    if (code) goto _exit;

H
Hongze Cheng 已提交
968
    pWriter->dReader.iBlockIdx++;
H
Hongze Cheng 已提交
969 970
  } else {
    pWriter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
971
    tMapDataReset(&pWriter->dReader.mDataBlk);
H
Hongze Cheng 已提交
972
  }
H
Hongze Cheng 已提交
973 974 975
  pWriter->dReader.iDataBlk = 0;  // point to the next one
  tBlockDataReset(&pWriter->dReader.bData);
  pWriter->dReader.iRow = 0;
H
Hongze Cheng 已提交
976
#endif
H
Hongze Cheng 已提交
977 978

_exit:
H
Hongze Cheng 已提交
979 980 981
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
982 983 984 985 986
  return code;
}

static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
  int32_t code = 0;
H
Hongze Cheng 已提交
987
  int32_t lino = 0;
H
Hongze Cheng 已提交
988

H
Hongze Cheng 已提交
989
#if 0
H
Hongze Cheng 已提交
990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004
  while (true) {
    if (pWriter->dReader.pBlockIdx == NULL) break;
    if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;

    SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx;
    code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx);
    if (code) goto _exit;

    if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    code = tsdbSnapNextTableData(pWriter);
    if (code) goto _exit;
H
Hongze Cheng 已提交
1005
  }
H
Hongze Cheng 已提交
1006
#endif
H
Hongze Cheng 已提交
1007

H
Hongze Cheng 已提交
1008
_exit:
H
Hongze Cheng 已提交
1009 1010 1011
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1012 1013 1014
  return code;
}

H
Hongze Cheng 已提交
1015 1016
static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
  int32_t lino = 0;

  if (pWriter->pDIter) {
    STsdbDataIter2* pIter = pWriter->pDIter;
    for (;;) {
      if (pIter->dIter.iBlockIdx + 1 >= taosArrayGetSize(pIter->dIter.aBlockIdx)) {
        pWriter->pDIter = NULL;
        break;
      }

      SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx + 1);

      int32_t c = tTABLEIDCmprFn(pBlockIdx, pId);
      if (c < 0) {
        ++pIter->dIter.iBlockIdx;

        code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
        TSDB_CHECK_CODE(code, lino, _exit);

        SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
        if (pNewBlockIdx == NULL) {
          code == TSDB_CODE_OUT_OF_MEMORY;
          TSDB_CHECK_CODE(code, lino, _exit);
        }

        pNewBlockIdx->suid = pBlockIdx->suid;
        pNewBlockIdx->uid = pBlockIdx->uid;

        code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pBlockIdx);
        TSDB_CHECK_CODE(code, lino, _exit);
      } else if (c == 0) {
        ++pIter->dIter.iBlockIdx;

        code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
        TSDB_CHECK_CODE(code, lino, _exit);

        pIter->dIter.iDataBlk = -1;

        break;
      } else {
        break;
      }
    }
  }
H
Hongze Cheng 已提交
1061

H
Hongze Cheng 已提交
1062 1063 1064 1065 1066
  pWriter->tbid = pId[0];

  tMapDataReset(&pWriter->mDataBlk);

#if 0
H
Hongze Cheng 已提交
1067
  code = tsdbSnapWriteCopyData(pWriter, pId);
H
Hongze Cheng 已提交
1068
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1069 1070 1071 1072 1073

  pWriter->id.suid = pId->suid;
  pWriter->id.uid = pId->uid;

  code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
H
Hongze Cheng 已提交
1074
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1075 1076

  tMapDataReset(&pWriter->dWriter.mDataBlk);
H
Hongze Cheng 已提交
1077
  code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1078 1079
  TSDB_CHECK_CODE(code, lino, _exit);
#endif
H
Hongze Cheng 已提交
1080

H
Hongze Cheng 已提交
1081 1082 1083 1084
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1085 1086 1087 1088 1089
  return code;
}

static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1090
  int32_t lino = 0;
H
Hongze Cheng 已提交
1091

H
Hongze Cheng 已提交
1092
#if 0
H
Hongze Cheng 已提交
1093 1094
  if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code;

H
Hongze Cheng 已提交
1095 1096 1097
  int32_t c = 1;
  if (pWriter->dReader.pBlockIdx) {
    c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id);
1098
    ASSERT(c >= 0);
H
Hongze Cheng 已提交
1099 1100 1101 1102 1103
  }

  if (c == 0) {
    SBlockData* pBData = &pWriter->dWriter.bData;

H
Hongze Cheng 已提交
1104 1105
    for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
      TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
H
Hongze Cheng 已提交
1106 1107

      code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid);
H
Hongze Cheng 已提交
1108 1109
      if (code) goto _err;

H
Hongze Cheng 已提交
1110 1111
      if (pBData->nRow >= pWriter->maxRow) {
        code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
H
Hongze Cheng 已提交
1112 1113 1114 1115
        if (code) goto _err;
      }
    }

H
Hongze Cheng 已提交
1116
    code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
H
Hongze Cheng 已提交
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
    if (code) goto _err;

    for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
      SDataBlk dataBlk;
      tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);

      code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
      if (code) goto _err;
    }

    code = tsdbSnapNextTableData(pWriter);
    if (code) goto _err;
  }

  if (pWriter->dWriter.mDataBlk.nItem) {
    SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid};
    code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);

    if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
  }

H
Hongze Cheng 已提交
1141 1142
  pWriter->id.suid = 0;
  pWriter->id.uid = 0;
H
Hongze Cheng 已提交
1143
#endif
H
Hongze Cheng 已提交
1144

H
Hongze Cheng 已提交
1145 1146 1147 1148
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1149 1150 1151
  return code;
}

H
Hongze Cheng 已提交
1152
static int32_t tsdbSnapWriteOpenDataFile(STsdbSnapWriter* pWriter, int32_t fid) {
H
Hongze Cheng 已提交
1153
  int32_t code = 0;
H
Hongze Cheng 已提交
1154 1155 1156
  int32_t lino = 0;

  ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid);
H
Hongze Cheng 已提交
1157

H
Hongze Cheng 已提交
1158
  STsdb* pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
1159 1160

  pWriter->fid = fid;
H
Hongze Cheng 已提交
1161
  pWriter->tbid = (TABLEID){0};
H
Hongze Cheng 已提交
1162 1163
  SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);

H
Hongze Cheng 已提交
1164 1165 1166 1167 1168 1169
  // open reader
  pWriter->pDataFReader = NULL;
  pWriter->iterList = NULL;
  pWriter->pDIter = NULL;
  pWriter->pIter = NULL;
  tRBTreeCreate(&pWriter->rbt, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
1170
  if (pSet) {
H
Hongze Cheng 已提交
1171 1172
    code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1173

H
Hongze Cheng 已提交
1174 1175 1176 1177 1178 1179
    code = tsdbOpenDataFileDataIter(pWriter->pDataFReader, &pWriter->pDIter);
    TSDB_CHECK_CODE(code, lino, _exit);
    if (pWriter->pDIter) {
      pWriter->pDIter->next = pWriter->iterList;
      pWriter->iterList = pWriter->pDIter;
    }
H
Hongze Cheng 已提交
1180 1181

    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195
      code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pIter);
      TSDB_CHECK_CODE(code, lino, _exit);

      if (pWriter->pIter) {
        code = tsdbSttFileDataIterNext(pWriter->pIter);
        TSDB_CHECK_CODE(code, lino, _exit);

        // add to tree
        tRBTreePut(&pWriter->rbt, &pWriter->pIter->rbtn);

        // add to list
        pWriter->pIter->next = pWriter->iterList;
        pWriter->iterList = pWriter->pIter;
      }
H
Hongze Cheng 已提交
1196
    }
H
Hongze Cheng 已提交
1197 1198 1199 1200 1201 1202 1203 1204

    pWriter->pIter = NULL;
  }

  // open writer
  SDiskID diskId;
  if (pSet) {
    diskId = pSet->diskId;
H
Hongze Cheng 已提交
1205
  } else {
H
Hongze Cheng 已提交
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0 /*TODO*/, &diskId);
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, diskId);
  }
  SDFileSet wSet = {.diskId = diskId,
                    .fid = fid,
                    .pHeadF = &(SHeadFile){.commitID = pWriter->commitID},
                    .pDataF = (pSet) ? pSet->pDataF : &(SDataFile){.commitID = pWriter->commitID},
                    .pSmaF = (pSet) ? pSet->pSmaF : &(SSmaFile){.commitID = pWriter->commitID},
                    .nSttF = 1,
                    .aSttF = {&(SSttFile){.commitID = pWriter->commitID}}};
  code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pWriter->aBlockIdx) {
    taosArrayClear(pWriter->aBlockIdx);
  } else if ((pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1224
  }
H
Hongze Cheng 已提交
1225

H
Hongze Cheng 已提交
1226
  tMapDataReset(&pWriter->mDataBlk);
H
Hongze Cheng 已提交
1227

H
Hongze Cheng 已提交
1228 1229 1230 1231 1232 1233
  if (pWriter->aSttBlk) {
    taosArrayClear(pWriter->aSttBlk);
  } else if ((pWriter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
1234

H
Hongze Cheng 已提交
1235 1236 1237 1238 1239 1240 1241 1242 1243
  tBlockDataReset(&pWriter->bData);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code),
              fid);
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pTsdb->pVnode), __func__, fid);
  }
H
Hongze Cheng 已提交
1244 1245 1246
  return code;
}

H
Hongze Cheng 已提交
1247
static int32_t tsdbSnapWriteCloseDataFile(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
1248
  int32_t code = 0;
H
Hongze Cheng 已提交
1249
  int32_t lino = 0;
H
Hongze Cheng 已提交
1250

H
Hongze Cheng 已提交
1251
  ASSERT(pWriter->pDataFWriter);
H
Hongze Cheng 已提交
1252

H
Hongze Cheng 已提交
1253 1254 1255 1256
#if 0
  // loop write remain data
  for (;;) {
    SRowInfo* pRowInfo;
H
Hongze Cheng 已提交
1257

H
Hongze Cheng 已提交
1258 1259
    code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1260

H
Hongze Cheng 已提交
1261
    if (pRowInfo == NULL) break;
H
Hongze Cheng 已提交
1262

H
Hongze Cheng 已提交
1263 1264
    code = tsdbSnapWriteTableData(pWriter, pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1265

H
Hongze Cheng 已提交
1266 1267 1268
    code = tsdbSnapWriteNextRow(pWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
1269

H
Hongze Cheng 已提交
1270 1271
  // TODO: write remain data
#endif
H
Hongze Cheng 已提交
1272

H
Hongze Cheng 已提交
1273 1274 1275
  // do file-level updates
  code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1276

H
Hongze Cheng 已提交
1277 1278
  code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1279

H
Hongze Cheng 已提交
1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291
  code = tsdbUpdateDFileSetHeader(pWriter->pDataFWriter);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pWriter->pDataFReader) {
    code = tsdbDataFReaderClose(&pWriter->pDataFReader);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1292 1293
  }

H
Hongze Cheng 已提交
1294 1295
  // TODO: do clear sources
  {}
H
Hongze Cheng 已提交
1296

H
Hongze Cheng 已提交
1297 1298 1299 1300 1301 1302
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1303
  return code;
H
Hongze Cheng 已提交
1304 1305
}

H
Hongze Cheng 已提交
1306
static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
H
Hongze Cheng 已提交
1307
  int32_t code = 0;
H
Hongze Cheng 已提交
1308
  int32_t lino = 0;
H
Hongze Cheng 已提交
1309

H
Hongze Cheng 已提交
1310
#if 0
H
Hongze Cheng 已提交
1311 1312 1313
  SBlockData* pBData = &pWriter->bData;
  TABLEID     id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]};
  TSDBROW     row = tsdbRowFromBlockData(pBData, iRow);
H
Hongze Cheng 已提交
1314 1315
  TSDBKEY     key = TSDBROW_KEY(&row);

H
Hongze Cheng 已提交
1316 1317 1318 1319
  *done = 0;
  while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow ||
         pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) {
    // Merge row by row
H
Hongze Cheng 已提交
1320 1321 1322
    for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
      TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
      TSDBKEY tKey = TSDBROW_KEY(&trow);
H
Hongze Cheng 已提交
1323

1324
      ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid);
H
Hongze Cheng 已提交
1325

H
Hongze Cheng 已提交
1326 1327 1328
      int32_t c = tsdbKeyCmprFn(&key, &tKey);
      if (c < 0) {
        code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
H
Hongze Cheng 已提交
1329
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1330 1331
      } else if (c > 0) {
        code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid);
H
Hongze Cheng 已提交
1332
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1333
      } else {
1334
        ASSERT(0);
H
Hongze Cheng 已提交
1335
      }
H
Hongze Cheng 已提交
1336

H
Hongze Cheng 已提交
1337
      if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
H
Hongze Cheng 已提交
1338 1339
        code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
                                  pWriter->cmprAlg);
H
Hongze Cheng 已提交
1340
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1341 1342
      }

H
Hongze Cheng 已提交
1343 1344 1345 1346
      if (c < 0) {
        *done = 1;
        goto _exit;
      }
H
Hongze Cheng 已提交
1347
    }
H
Hongze Cheng 已提交
1348

H
Hongze Cheng 已提交
1349
    // Merge row by block
H
Hongze Cheng 已提交
1350
    SDataBlk tDataBlk = {.minKey = key, .maxKey = key};
H
Hongze Cheng 已提交
1351
    for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
H
Hongze Cheng 已提交
1352
      SDataBlk dataBlk;
H
Hongze Cheng 已提交
1353
      tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1354 1355 1356

      int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk);
      if (c < 0) {
H
Hongze Cheng 已提交
1357 1358
        code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
                                  pWriter->cmprAlg);
H
Hongze Cheng 已提交
1359
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1360

H
Hongze Cheng 已提交
1361
        code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1362
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1363
      } else if (c > 0) {
H
Hongze Cheng 已提交
1364
        code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
H
Hongze Cheng 已提交
1365
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1366 1367

        if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
H
Hongze Cheng 已提交
1368 1369
          code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
                                    pWriter->cmprAlg);
H
Hongze Cheng 已提交
1370
          TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1371
        }
H
Hongze Cheng 已提交
1372 1373 1374

        *done = 1;
        goto _exit;
H
Hongze Cheng 已提交
1375 1376
      } else {
        code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
H
Hongze Cheng 已提交
1377
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1378
        pWriter->dReader.iRow = 0;
H
Hongze Cheng 已提交
1379

H
Hongze Cheng 已提交
1380 1381
        pWriter->dReader.iDataBlk++;
        break;
H
Hongze Cheng 已提交
1382
      }
H
Hongze Cheng 已提交
1383
    }
H
Hongze Cheng 已提交
1384
  }
H
Hongze Cheng 已提交
1385
#endif
H
Hongze Cheng 已提交
1386

H
Hongze Cheng 已提交
1387
_exit:
H
Hongze Cheng 已提交
1388 1389 1390
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1391 1392 1393
  return code;
}

H
Hongze Cheng 已提交
1394
static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
H
Hongze Cheng 已提交
1395
  int32_t code = 0;
H
Hongze Cheng 已提交
1396
  int32_t lino = 0;
H
Hongze Cheng 已提交
1397

H
Hongze Cheng 已提交
1398
#if 0
H
Hongze Cheng 已提交
1399 1400 1401 1402
  TABLEID     id = {.suid = pWriter->bData.suid,
                    .uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]};
  TSDBROW     row = tsdbRowFromBlockData(&pWriter->bData, iRow);
  SBlockData* pBData = &pWriter->dWriter.sData;
H
Hongze Cheng 已提交
1403

H
Hongze Cheng 已提交
1404 1405
  if (pBData->suid || pBData->uid) {
    if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
H
Hongze Cheng 已提交
1406 1407
      code = tsdbWriteSttBlock(pWriter->pDataFWriter, pBData, pWriter->aSttBlk, pWriter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1408 1409 1410 1411 1412 1413 1414 1415

      pBData->suid = 0;
      pBData->uid = 0;
    }
  }

  if (pBData->suid == 0 && pBData->uid == 0) {
    code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
H
Hongze Cheng 已提交
1416
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1417

H
Hongze Cheng 已提交
1418 1419
    TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid};
    code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1420
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1421 1422 1423
  }

  code = tBlockDataAppendRow(pBData, &row, NULL, id.uid);
H
Hongze Cheng 已提交
1424
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1425

H
Hongze Cheng 已提交
1426
  if (pBData->nRow >= pWriter->maxRow) {
H
Hongze Cheng 已提交
1427 1428
    code = tsdbWriteSttBlock(pWriter->pDataFWriter, pBData, pWriter->aSttBlk, pWriter->cmprAlg);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1429
  }
H
Hongze Cheng 已提交
1430
#endif
H
Hongze Cheng 已提交
1431

H
Hongze Cheng 已提交
1432
_exit:
H
Hongze Cheng 已提交
1433 1434 1435
  if (code) {
    tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
  }
H
Hongze Cheng 已提交
1436 1437 1438
  return code;
}

H
Hongze Cheng 已提交
1439
static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
H
Hongze Cheng 已提交
1440
  int32_t code = 0;
H
Hongze Cheng 已提交
1441
  int32_t lino = 0;
H
Hongze Cheng 已提交
1442

H
Hongze Cheng 已提交
1443 1444 1445
  if (pWriter->pIter) {
    code = tsdbDataIterNext2(pWriter->pIter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1446

H
Hongze Cheng 已提交
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460
    if (pWriter->pIter->rowInfo.suid == 0 && pWriter->pIter->rowInfo.uid == 0) {
      pWriter->pIter = NULL;
    } else {
      SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
      if (pNode) {
        int32_t c = tsdbDataIterCmprFn(&pWriter->pIter->rbtn, pNode);
        if (c > 0) {
          tRBTreePut(&pWriter->rbt, &pWriter->pIter->rbtn);
          pWriter->pIter = NULL;
        } else if (c == 0) {
          ASSERT(0);
        }
      }
    }
H
Hongze Cheng 已提交
1461 1462
  }

H
Hongze Cheng 已提交
1463 1464 1465 1466 1467 1468
  if (pWriter->pIter == NULL) {
    SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
    if (pNode) {
      tRBTreeDrop(&pWriter->rbt, pNode);
      pWriter->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
    }
H
Hongze Cheng 已提交
1469 1470
  }

H
Hongze Cheng 已提交
1471 1472 1473 1474 1475 1476
  if (ppRowInfo) {
    if (pWriter->pIter) {
      *ppRowInfo = &pWriter->pIter->rowInfo;
    } else {
      *ppRowInfo = NULL;
    }
H
Hongze Cheng 已提交
1477
  }
H
Hongze Cheng 已提交
1478

H
Hongze Cheng 已提交
1479 1480 1481
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1482
  }
H
Hongze Cheng 已提交
1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496
  return code;
}

static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;

  if (pWriter->pIter) {
    *ppRowInfo = &pWriter->pIter->rowInfo;
    goto _exit;
  }

  code = tsdbSnapWriteNextRow(pWriter, ppRowInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1497

H
Hongze Cheng 已提交
1498
_exit:
H
Hongze Cheng 已提交
1499 1500 1501
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1502
  return code;
H
Hongze Cheng 已提交
1503
}
H
Hongze Cheng 已提交
1504

H
Hongze Cheng 已提交
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515
static int32_t tsdbSnapWriteRowImpl(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;

  // TODO
  ASSERT(0);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1516 1517 1518
  return code;
}

H
Hongze Cheng 已提交
1519 1520 1521
static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1522

H
Hongze Cheng 已提交
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
  code = tBlockDataAppendRow(&pWriter->bData, pRow, NULL, pWriter->tbid.uid);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pWriter->bData.nRow >= pWriter->maxRow) {
    code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}
H
Hongze Cheng 已提交
1537

H
Hongze Cheng 已提交
1538 1539 1540
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1541

H
Hongze Cheng 已提交
1542 1543 1544 1545 1546 1547
  // switch to new table if need
  if (pRowInfo->uid != pWriter->tbid.uid) {
    if (pRowInfo->uid) {
      code = tsdbSnapWriteTableDataEnd(pWriter);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
1548

H
Hongze Cheng 已提交
1549 1550 1551
    code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
1552

H
Hongze Cheng 已提交
1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
  // do write the row
  if (pWriter->pDIter == NULL /* || false */) {
    goto _write_incoming_row;
  } else {
    for (;;) {
      while (pWriter->pDIter->dIter.iRow + 1 < pWriter->pDIter->dIter.bData.nRow) {
        TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow + 1);

        int32_t c = tsdbRowCmprFn(&pRowInfo->row, &row);
        if (c < 0) {
          goto _write_incoming_row;
        } else if (c > 0) {
          ++pWriter->pDIter->dIter.iRow;

          code = tsdbSnapWriteTableRow(pWriter, &row);
          TSDB_CHECK_CODE(code, lino, _exit);
        } else {
          ASSERT(0);
        }
H
Hongze Cheng 已提交
1572 1573
      }

H
Hongze Cheng 已提交
1574 1575
      while (pWriter->pDIter->dIter.iDataBlk < pWriter->pDIter->dIter.mDataBlk.nItem) {
        SDataBlk dataBlk;
H
Hongze Cheng 已提交
1576

H
Hongze Cheng 已提交
1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
        tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk + 1, &dataBlk,
                             tGetDataBlk);

        int32_t c = tDataBlkCmprFn(
            &dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)});
        if (c > 0) {
          goto _write_incoming_row;
        } else if (c < 0) {
          ++pWriter->pDIter->dIter.iDataBlk;
          tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
        } else {
          ++pWriter->pDIter->dIter.iDataBlk;

          code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
          TSDB_CHECK_CODE(code, lino, _exit);

          pWriter->pDIter->dIter.iRow = -1;
          break;
        }
      }
    }
H
Hongze Cheng 已提交
1598 1599
  }

H
Hongze Cheng 已提交
1600 1601 1602 1603 1604 1605 1606 1607
_write_incoming_row:
  code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1608
  return code;
H
Hongze Cheng 已提交
1609
}
H
Hongze Cheng 已提交
1610

H
Hongze Cheng 已提交
1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tDecmprBlockData(pHdr->data, pHdr->size, &pWriter->inData, pWriter->aBuf);
  TSDB_CHECK_CODE(code, lino, _exit);

  ASSERT(pWriter->inData.nRow > 0);

  // switch to new data file if need
  int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision);
  if (pWriter->fid != fid) {
    if (pWriter->pDataFWriter) {
      code = tsdbSnapWriteCloseDataFile(pWriter);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    code = tsdbSnapWriteOpenDataFile(pWriter, fid);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // loop write each row
  SRowInfo* pRowInfo;

  code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
  for (int32_t iRow = 0; iRow < pWriter->bData.nRow; ++iRow) {
    SRowInfo rInfo = {.suid = pWriter->inData.suid,
                      .uid = pWriter->inData.uid ? pWriter->inData.uid : pWriter->inData.aUid[iRow],
                      .row = tsdbRowFromBlockData(&pWriter->inData, iRow)};

    for (;;) {
      if (pRowInfo == NULL) {
        code = tsdbSnapWriteTableData(pWriter, &rInfo);
        TSDB_CHECK_CODE(code, lino, _exit);
        break;
      } else {
        int32_t c = tRowInfoCmprFn(&rInfo, pRowInfo);
        if (c < 0) {
          code = tsdbSnapWriteTableData(pWriter, &rInfo);
          TSDB_CHECK_CODE(code, lino, _exit);
          break;
        } else if (c > 0) {
          code = tsdbSnapWriteTableData(pWriter, pRowInfo);
          TSDB_CHECK_CODE(code, lino, _exit);

          code = tsdbSnapWriteNextRow(pWriter, &pRowInfo);
          TSDB_CHECK_CODE(code, lino, _exit);
        } else {
          ASSERT(0);
        }
      }
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->pTsdb->pVnode), __func__,
              pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow);
  }
H
Hongze Cheng 已提交
1673 1674 1675
  return code;
}

H
Hongze Cheng 已提交
1676
// SNAP_DATA_DEL
H
Hongze Cheng 已提交
1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705
static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) {
  int32_t code = 0;

  while (true) {
    if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;

    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);

    if (tTABLEIDCmprFn(pDelIdx, pId) >= 0) break;

    code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
    if (code) goto _exit;

    SDelIdx delIdx = *pDelIdx;
    code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
    if (code) goto _exit;

    if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    pWriter->iDelIdx++;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
1706
static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
H
Hongze Cheng 已提交
1707
  int32_t code = 0;
H
Hongze Cheng 已提交
1708 1709
  STsdb*  pTsdb = pWriter->pTsdb;

H
Hongze Cheng 已提交
1710
  // Open del file if not opened yet
H
Hongze Cheng 已提交
1711
  if (pWriter->pDelFWriter == NULL) {
H
Hongze Cheng 已提交
1712
    SDelFile* pDelFile = pWriter->fs.pDelFile;
H
Hongze Cheng 已提交
1713 1714 1715

    // reader
    if (pDelFile) {
H
Hongze Cheng 已提交
1716
      code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb);
H
Hongze Cheng 已提交
1717 1718
      if (code) goto _err;

H
Hongze Cheng 已提交
1719
      code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
H
Hongze Cheng 已提交
1720
      if (code) goto _err;
H
Hongze Cheng 已提交
1721 1722
    } else {
      taosArrayClear(pWriter->aDelIdxR);
H
Hongze Cheng 已提交
1723
    }
H
Hongze Cheng 已提交
1724
    pWriter->iDelIdx = 0;
H
Hongze Cheng 已提交
1725 1726

    // writer
H
Hongze Cheng 已提交
1727
    SDelFile delFile = {.commitID = pWriter->commitID};
H
Hongze Cheng 已提交
1728 1729
    code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
    if (code) goto _err;
H
Hongze Cheng 已提交
1730
    taosArrayClear(pWriter->aDelIdxW);
H
Hongze Cheng 已提交
1731 1732
  }

H
Hongze Cheng 已提交
1733
  TABLEID id = *(TABLEID*)pHdr->data;
H
Hongze Cheng 已提交
1734

H
Hongze Cheng 已提交
1735 1736 1737
  // Move write data < id
  code = tsdbSnapMoveWriteDelData(pWriter, &id);
  if (code) goto _err;
H
Hongze Cheng 已提交
1738

H
Hongze Cheng 已提交
1739
  // Merge incoming data with current
H
Hongze Cheng 已提交
1740 1741 1742
  if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
      tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
H
Hongze Cheng 已提交
1743

H
Hongze Cheng 已提交
1744
    code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
H
Hongze Cheng 已提交
1745 1746
    if (code) goto _err;

H
Hongze Cheng 已提交
1747 1748 1749 1750 1751
    pWriter->iDelIdx++;
  } else {
    taosArrayClear(pWriter->aDelData);
  }

H
Hongze Cheng 已提交
1752 1753
  int64_t n = sizeof(TABLEID);
  while (n < pHdr->size) {
H
Hongze Cheng 已提交
1754 1755
    SDelData delData;

H
Hongze Cheng 已提交
1756
    n += tGetDelData(pHdr->data + n, &delData);
H
Hongze Cheng 已提交
1757 1758

    if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
H
Hongze Cheng 已提交
1759 1760 1761
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
1762
  }
H
Hongze Cheng 已提交
1763

H
Hongze Cheng 已提交
1764 1765 1766 1767 1768 1769 1770
  SDelIdx delIdx = {.suid = id.suid, .uid = id.uid};
  code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
  if (code) goto _err;

  if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
1771 1772 1773 1774 1775
  }

  return code;

_err:
S
Shengliang Guan 已提交
1776
  tsdbError("vgId:%d, vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
1777
            tstrerror(code));
H
Hongze Cheng 已提交
1778 1779 1780
  return code;
}

H
Hongze Cheng 已提交
1781 1782
static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1783 1784
  STsdb*  pTsdb = pWriter->pTsdb;

H
Hongze Cheng 已提交
1785
  if (pWriter->pDelFWriter == NULL) return code;
H
Hongze Cheng 已提交
1786

H
Hongze Cheng 已提交
1787 1788 1789
  TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
  code = tsdbSnapMoveWriteDelData(pWriter, &id);
  if (code) goto _err;
H
Hongze Cheng 已提交
1790

H
Hongze Cheng 已提交
1791 1792 1793
  code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW);
  if (code) goto _err;

H
Hongze Cheng 已提交
1794 1795 1796
  code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
1797
  code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
  if (code) goto _err;

  code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
  if (code) goto _err;

  if (pWriter->pDelFReader) {
    code = tsdbDelFReaderClose(&pWriter->pDelFReader);
    if (code) goto _err;
  }

S
Shengliang Guan 已提交
1808
  tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
H
Hongze Cheng 已提交
1809 1810 1811
  return code;

_err:
S
Shengliang Guan 已提交
1812
  tsdbError("vgId:%d, vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
1813
            tstrerror(code));
H
Hongze Cheng 已提交
1814 1815 1816
  return code;
}

H
Hongze Cheng 已提交
1817
// APIs
H
Hongze Cheng 已提交
1818
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
H
Hongze Cheng 已提交
1819 1820
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1821 1822

  // alloc
H
Hongze Cheng 已提交
1823
  STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
H
Hongze Cheng 已提交
1824 1825
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1826
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1827 1828 1829 1830
  }
  pWriter->pTsdb = pTsdb;
  pWriter->sver = sver;
  pWriter->ever = ever;
H
Hongze Cheng 已提交
1831 1832 1833 1834 1835
  pWriter->minutes = pTsdb->keepCfg.days;
  pWriter->precision = pTsdb->keepCfg.precision;
  pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
1836 1837
  pWriter->commitID = pTsdb->pVnode->state.commitID;

H
Hongze Cheng 已提交
1838 1839 1840
  code = tsdbFSCopy(pTsdb, &pWriter->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
1841
  // SNAP_DATA_TSDB
H
Hongze Cheng 已提交
1842 1843 1844 1845 1846 1847 1848 1849 1850
#if 1
  pWriter->fid = INT32_MIN;

  code = tBlockDataCreate(&pWriter->inData);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tBlockDataCreate(&pWriter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);
#else
H
Hongze Cheng 已提交
1851
  code = tBlockDataCreate(&pWriter->bData);
H
Hongze Cheng 已提交
1852
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1853

H
Hongze Cheng 已提交
1854 1855 1856 1857 1858
  pWriter->fid = INT32_MIN;
  pWriter->id = (TABLEID){0};
  // Reader
  pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pWriter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
1859
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1860
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1861
  }
H
Hongze Cheng 已提交
1862
  code = tBlockDataCreate(&pWriter->dReader.bData);
H
Hongze Cheng 已提交
1863
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1864

H
Hongze Cheng 已提交
1865 1866 1867
  // Writer
  pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pWriter->dWriter.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
1868
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1869
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1870
  }
H
Hongze Cheng 已提交
1871 1872
  pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pWriter->dWriter.aSttBlk == NULL) {
H
Hongze Cheng 已提交
1873
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1874
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1875
  }
H
Hongze Cheng 已提交
1876
  code = tBlockDataCreate(&pWriter->dWriter.bData);
H
Hongze Cheng 已提交
1877
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1878
  code = tBlockDataCreate(&pWriter->dWriter.sData);
H
Hongze Cheng 已提交
1879
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1880
#endif
H
Hongze Cheng 已提交
1881

H
Hongze Cheng 已提交
1882
  // SNAP_DATA_DEL
H
Hongze Cheng 已提交
1883 1884 1885
  pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
  if (pWriter->aDelIdxR == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1886
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1887 1888 1889 1890
  }
  pWriter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pWriter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1891
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1892 1893 1894 1895
  }
  pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx));
  if (pWriter->aDelIdxW == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1896
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1897
  }
H
Hongze Cheng 已提交
1898

H
Hongze Cheng 已提交
1899 1900
_exit:
  if (code) {
S
Shengliang Guan 已提交
1901
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1902 1903 1904
    *ppWriter = NULL;

    if (pWriter) {
H
Hongze Cheng 已提交
1905
#if 0
H
Hongze Cheng 已提交
1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917
      if (pWriter->aDelIdxW) taosArrayDestroy(pWriter->aDelIdxW);
      if (pWriter->aDelData) taosArrayDestroy(pWriter->aDelData);
      if (pWriter->aDelIdxR) taosArrayDestroy(pWriter->aDelIdxR);
      tBlockDataDestroy(&pWriter->dWriter.sData, 1);
      tBlockDataDestroy(&pWriter->dWriter.bData, 1);
      if (pWriter->dWriter.aSttBlk) taosArrayDestroy(pWriter->dWriter.aSttBlk);
      if (pWriter->dWriter.aBlockIdx) taosArrayDestroy(pWriter->dWriter.aBlockIdx);
      tBlockDataDestroy(&pWriter->dReader.bData, 1);
      if (pWriter->dReader.aBlockIdx) taosArrayDestroy(pWriter->dReader.aBlockIdx);
      tBlockDataDestroy(&pWriter->bData, 1);
      tsdbFSDestroy(&pWriter->fs);
      taosMemoryFree(pWriter);
H
Hongze Cheng 已提交
1918
#endif
H
Hongze Cheng 已提交
1919 1920
    }
  } else {
S
Shengliang Guan 已提交
1921
    tsdbInfo("vgId:%d, %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
1922 1923
    *ppWriter = pWriter;
  }
H
Hongze Cheng 已提交
1924 1925 1926
  return code;
}

H
Hongze Cheng 已提交
1927 1928
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1929 1930
  if (pWriter->pDataFWriter) {
    code = tsdbSnapWriteCloseDataFile(pWriter);
H
Hongze Cheng 已提交
1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941
    if (code) goto _exit;
  }

  code = tsdbSnapWriteDelEnd(pWriter);
  if (code) goto _exit;

  code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
  if (code) goto _exit;

_exit:
  if (code) {
S
Shengliang Guan 已提交
1942
    tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
H
Hongze Cheng 已提交
1943 1944 1945 1946
  }
  return code;
}

H
Hongze Cheng 已提交
1947
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
H
Hongze Cheng 已提交
1948
  int32_t          code = 0;
H
Hongze Cheng 已提交
1949
  STsdbSnapWriter* pWriter = *ppWriter;
H
Hongze Cheng 已提交
1950
  STsdb*           pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
1951 1952

  if (rollback) {
H
Hongze Cheng 已提交
1953
    tsdbRollbackCommit(pWriter->pTsdb);
H
Hongze Cheng 已提交
1954
  } else {
H
Hongze Cheng 已提交
1955 1956 1957
    // lock
    taosThreadRwlockWrlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
1958
    code = tsdbFSCommit(pWriter->pTsdb);
H
Hongze Cheng 已提交
1959 1960 1961 1962 1963 1964 1965
    if (code) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      goto _err;
    }

    // unlock
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1966 1967
  }

H
Hongze Cheng 已提交
1968 1969 1970 1971 1972 1973 1974
  // SNAP_DATA_DEL
  taosArrayDestroy(pWriter->aDelIdxW);
  taosArrayDestroy(pWriter->aDelData);
  taosArrayDestroy(pWriter->aDelIdxR);

  // SNAP_DATA_TSDB

H
Hongze Cheng 已提交
1975 1976 1977 1978 1979 1980
  // // Writer
  // tBlockDataDestroy(&pWriter->dWriter.sData, 1);
  // tBlockDataDestroy(&pWriter->dWriter.bData, 1);
  // taosArrayDestroy(pWriter->dWriter.aSttBlk);
  // tMapDataClear(&pWriter->dWriter.mDataBlk);
  // taosArrayDestroy(pWriter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
1981

H
Hongze Cheng 已提交
1982 1983 1984 1985
  // // Reader
  // tBlockDataDestroy(&pWriter->dReader.bData, 1);
  // tMapDataClear(&pWriter->dReader.mDataBlk);
  // taosArrayDestroy(pWriter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
1986 1987

  tBlockDataDestroy(&pWriter->bData, 1);
1988
  tDestroyTSchema(pWriter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1989

H
Hongze Cheng 已提交
1990 1991 1992
  for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
    tFree(pWriter->aBuf[iBuf]);
  }
S
Shengliang Guan 已提交
1993
  tsdbInfo("vgId:%d, %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
1994 1995 1996 1997 1998
  taosMemoryFree(pWriter);
  *ppWriter = NULL;
  return code;

_err:
S
Shengliang Guan 已提交
1999
  tsdbError("vgId:%d, vnode snapshot tsdb writer close for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
2000 2001 2002
            pWriter->pTsdb->path, tstrerror(code));
  taosMemoryFree(pWriter);
  *ppWriter = NULL;
H
Hongze Cheng 已提交
2003 2004 2005
  return code;
}

H
Hongze Cheng 已提交
2006 2007 2008
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
2009

C
Cary Xu 已提交
2010
  if (pHdr->type == SNAP_DATA_TSDB) {
H
Hongze Cheng 已提交
2011 2012
    code = tsdbSnapWriteTimeSeriesData(pWriter, pHdr);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
2013
    goto _exit;
H
Hongze Cheng 已提交
2014 2015 2016
  } else if (pWriter->pDataFWriter) {
    code = tsdbSnapWriteCloseDataFile(pWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
2017 2018
  }

C
Cary Xu 已提交
2019
  if (pHdr->type == SNAP_DATA_DEL) {
H
Hongze Cheng 已提交
2020 2021 2022
    code = tsdbSnapWriteDelData(pWriter, pHdr);
    TSDB_CHECK_CODE(code, lino, _exit);
    goto _exit;
H
Hongze Cheng 已提交
2023 2024
  }

H
Hongze Cheng 已提交
2025
_exit:
H
Hongze Cheng 已提交
2026 2027 2028 2029 2030 2031 2032
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
              TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code), pHdr->type, pHdr->index, pHdr->size);
  } else {
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
              pHdr->type, pHdr->index, pHdr->size);
  }
H
Hongze Cheng 已提交
2033 2034
  return code;
}