tsdbSnapshot.c 43.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18
// STsdbSnapReader ========================================
H
Hongze Cheng 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT;
typedef struct {
  SRBTreeNode n;
  SRowInfo    rInfo;
  EFIterT     type;
  union {
    struct {
      SArray*    aBlockIdx;
      int32_t    iBlockIdx;
      SBlockIdx* pBlockIdx;
      SMapData   mBlock;
      int32_t    iBlock;
    };  // .data file
    struct {
      int32_t iStt;
      SArray* aSttBlk;
      int32_t iSttBlk;
    };  // .stt file
  };
  SBlockData bData;
  int32_t    iRow;
} SFDataIter;

H
Hongze Cheng 已提交
42
struct STsdbSnapReader {
H
more  
Hongze Cheng 已提交
43 44 45
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
46
  STsdbFS fs;
C
Cary Xu 已提交
47
  int8_t  type;
H
more  
Hongze Cheng 已提交
48
  // for data file
H
Hongze Cheng 已提交
49 50
  int8_t        dataDone;
  int32_t       fid;
H
more  
Hongze Cheng 已提交
51
  SDataFReader* pDataFReader;
H
Hongze Cheng 已提交
52 53
  SFDataIter*   pIter;
  SRBTree       rbt;
H
Hongze Cheng 已提交
54
  SFDataIter    aFDataIter[TSDB_MAX_STT_TRIGGER + 1];
H
Hongze Cheng 已提交
55 56
  SBlockData    bData;
  SSkmInfo      skmTable;
H
more  
Hongze Cheng 已提交
57
  // for del file
H
Hongze Cheng 已提交
58
  int8_t       delDone;
H
more  
Hongze Cheng 已提交
59
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
60
  SArray*      aDelIdx;  // SArray<SDelIdx>
H
Hongze Cheng 已提交
61 62
  int32_t      iDelIdx;
  SArray*      aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
63
  uint8_t*     aBuf[5];
H
Hongze Cheng 已提交
64 65
};

H
Hongze Cheng 已提交
66 67 68 69
extern int32_t tRowInfoCmprFn(const void* p1, const void* p2);
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);

H
Hongze Cheng 已提交
70 71 72 73 74 75 76
static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
  SFDataIter* pIter1 = (SFDataIter*)(((uint8_t*)pNode1) - offsetof(SFDataIter, n));
  SFDataIter* pIter2 = (SFDataIter*)(((uint8_t*)pNode2) - offsetof(SFDataIter, n));

  return tRowInfoCmprFn(&pIter1->rInfo, &pIter2->rInfo);
}

H
Hongze Cheng 已提交
77
static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
H
Hongze Cheng 已提交
78 79
  int32_t code = 0;

H
Hongze Cheng 已提交
80 81 82
  SDFileSet  dFileSet = {.fid = pReader->fid};
  SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT);
  if (pSet == NULL) return code;
H
Hongze Cheng 已提交
83

H
Hongze Cheng 已提交
84 85 86 87 88
  pReader->fid = pSet->fid;
  code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
  if (code) goto _err;

  pReader->pIter = NULL;
H
Hongze Cheng 已提交
89
  tRBTreeCreate(&pReader->rbt, tFDataIterCmprFn);
H
Hongze Cheng 已提交
90 91 92 93 94 95 96 97 98 99 100

  // .data file
  SFDataIter* pIter = &pReader->aFDataIter[0];
  pIter->type = SNAP_DATA_FILE_ITER;

  code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx);
  if (code) goto _err;

  for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) {
    pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);

H
Hongze Cheng 已提交
101
    code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
H
Hongze Cheng 已提交
102 103 104 105 106 107 108
    if (code) goto _err;

    for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
      SDataBlk dataBlk;
      tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);

      if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;
H
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110
      code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
H
Hongze Cheng 已提交
111 112
      if (code) goto _err;

113 114
      ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid);
      ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid);
H
Hongze Cheng 已提交
115

H
Hongze Cheng 已提交
116 117
      for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
        int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
H
Hongze Cheng 已提交
118

H
Hongze Cheng 已提交
119 120 121 122 123 124
        if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
          pIter->rInfo.suid = pIter->pBlockIdx->suid;
          pIter->rInfo.uid = pIter->pBlockIdx->uid;
          pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
          goto _add_iter_and_break;
        }
H
Hongze Cheng 已提交
125
      }
H
Hongze Cheng 已提交
126
    }
H
Hongze Cheng 已提交
127

H
Hongze Cheng 已提交
128
    continue;
H
Hongze Cheng 已提交
129

H
Hongze Cheng 已提交
130 131 132 133
  _add_iter_and_break:
    tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
    break;
  }
H
Hongze Cheng 已提交
134

H
Hongze Cheng 已提交
135 136 137 138 139
  // .stt file
  pIter = &pReader->aFDataIter[1];
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
    pIter->type = SNAP_STT_FILE_ITER;
    pIter->iStt = iStt;
H
Hongze Cheng 已提交
140

H
Hongze Cheng 已提交
141 142 143 144 145 146 147 148 149
    code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk);
    if (code) goto _err;

    for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
      SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);

      if (pSttBlk->minVer > pReader->ever) continue;
      if (pSttBlk->maxVer < pReader->sver) continue;

H
Hongze Cheng 已提交
150
      code = tsdbReadSttBlockEx(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
151 152 153 154 155 156 157
      if (code) goto _err;

      for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
        int64_t rowVer = pIter->bData.aVersion[pIter->iRow];

        if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
          pIter->rInfo.suid = pIter->bData.suid;
H
Hongze Cheng 已提交
158
          pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
H
Hongze Cheng 已提交
159 160 161 162
          pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
          goto _add_iter;
        }
      }
H
Hongze Cheng 已提交
163
    }
H
Hongze Cheng 已提交
164

H
Hongze Cheng 已提交
165
    continue;
H
Hongze Cheng 已提交
166

H
Hongze Cheng 已提交
167 168 169 170
  _add_iter:
    tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter);
    pIter++;
  }
H
Hongze Cheng 已提交
171

H
Hongze Cheng 已提交
172 173 174 175 176
  tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pReader->pTsdb->pVnode),
           pReader->pTsdb->path, pReader->fid);
  return code;

_err:
S
Shengliang Guan 已提交
177
  tsdbError("vgId:%d, vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode),
H
Hongze Cheng 已提交
178 179 180 181 182 183 184 185
            tstrerror(code));
  return code;
}

static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
  int32_t code = 0;

  if (pReader->pIter) {
H
Hongze Cheng 已提交
186
    SFDataIter* pIter = NULL;
H
Hongze Cheng 已提交
187 188
    while (true) {
    _find_row:
H
Hongze Cheng 已提交
189
      pIter = pReader->pIter;
H
Hongze Cheng 已提交
190 191 192 193 194 195 196
      for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
        int64_t rowVer = pIter->bData.aVersion[pIter->iRow];

        if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
          pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
          pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
          goto _out;
H
Hongze Cheng 已提交
197
        }
H
Hongze Cheng 已提交
198
      }
H
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200 201 202 203 204
      if (pIter->type == SNAP_DATA_FILE_ITER) {
        while (true) {
          for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) {
            SDataBlk dataBlk;
            tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206 207 208
            if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue;

            code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData);
H
Hongze Cheng 已提交
209 210
            if (code) goto _err;

H
Hongze Cheng 已提交
211 212
            pIter->iRow = -1;
            goto _find_row;
H
Hongze Cheng 已提交
213
          }
H
Hongze Cheng 已提交
214 215 216 217 218

          pIter->iBlockIdx++;
          if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break;

          pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx);
H
Hongze Cheng 已提交
219
          code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock);
H
Hongze Cheng 已提交
220 221
          if (code) goto _err;
          pIter->iBlock = -1;
H
Hongze Cheng 已提交
222
        }
H
Hongze Cheng 已提交
223

H
Hongze Cheng 已提交
224
        pReader->pIter = NULL;
H
Hongze Cheng 已提交
225
        break;
H
Hongze Cheng 已提交
226 227 228
      } else if (pIter->type == SNAP_STT_FILE_ITER) {
        for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
          SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
229

H
Hongze Cheng 已提交
230
          if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue;
H
Hongze Cheng 已提交
231

H
Hongze Cheng 已提交
232
          code = tsdbReadSttBlockEx(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
233 234 235 236
          if (code) goto _err;

          pIter->iRow = -1;
          goto _find_row;
H
Hongze Cheng 已提交
237
        }
H
Hongze Cheng 已提交
238 239

        pReader->pIter = NULL;
H
Hongze Cheng 已提交
240
        break;
H
Hongze Cheng 已提交
241
      } else {
242
        ASSERT(0);
H
Hongze Cheng 已提交
243 244 245 246 247 248 249 250 251 252 253
      }
    }

  _out:
    pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
    if (pReader->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo);
      if (c > 0) {
        tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
        pReader->pIter = NULL;
      } else {
254
        ASSERT(c);
H
Hongze Cheng 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
      }
    }
  }

  if (pReader->pIter == NULL) {
    pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt);
    if (pReader->pIter) {
      tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter);
    }
  }

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285
static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) {
  if (pReader->pIter) {
    return &pReader->pIter->rInfo;
  } else {
    tsdbSnapNextRow(pReader);

    if (pReader->pIter) {
      return &pReader->pIter->rInfo;
    } else {
      return NULL;
    }
  }
}

H
Hongze Cheng 已提交
286 287 288
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;

289
  ASSERT(pReader->bData.nRow);
H
Hongze Cheng 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342

  int32_t aBufN[5] = {0};
  code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN);
  if (code) goto _exit;

  int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
  if (*ppData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData;
  pHdr->type = SNAP_DATA_TSDB;
  pHdr->size = size;

  memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]);
  memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]);
  if (aBufN[1]) {
    memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]);
  }
  if (aBufN[0]) {
    memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]);
  }

_exit:
  return code;
}

static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;
  STsdb*  pTsdb = pReader->pTsdb;

  while (true) {
    if (pReader->pDataFReader == NULL) {
      code = tsdbSnapReadOpenFile(pReader);
      if (code) goto _err;
    }

    if (pReader->pDataFReader == NULL) break;

    SRowInfo* pRowInfo = tsdbSnapGetRow(pReader);
    if (pRowInfo == NULL) {
      tsdbDataFReaderClose(&pReader->pDataFReader);
      continue;
    }

    TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
    SBlockData* pBlockData = &pReader->bData;

    code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
    if (code) goto _err;

H
Hongze Cheng 已提交
343
    code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
344 345 346 347 348 349 350 351 352 353 354
    if (code) goto _err;

    while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
      code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid);
      if (code) goto _err;

      code = tsdbSnapNextRow(pReader);
      if (code) goto _err;

      pRowInfo = tsdbSnapGetRow(pReader);
      if (pRowInfo == NULL) {
H
Hongze Cheng 已提交
355 356
        tsdbDataFReaderClose(&pReader->pDataFReader);
        break;
H
Hongze Cheng 已提交
357
      }
H
Hongze Cheng 已提交
358 359

      if (pBlockData->nRow >= 4096) break;
H
Hongze Cheng 已提交
360
    }
H
Hongze Cheng 已提交
361 362 363 364 365

    code = tsdbSnapCmprData(pReader, ppData);
    if (code) goto _err;

    break;
H
Hongze Cheng 已提交
366 367
  }

H
Hongze Cheng 已提交
368 369 370
  return code;

_err:
S
Shengliang Guan 已提交
371
  tsdbError("vgId:%d, vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
372
            tstrerror(code));
H
Hongze Cheng 已提交
373 374 375 376 377 378
  return code;
}

static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t   code = 0;
  STsdb*    pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
379
  SDelFile* pDelFile = pReader->fs.pDelFile;
H
Hongze Cheng 已提交
380 381 382 383 384 385 386

  if (pReader->pDelFReader == NULL) {
    if (pDelFile == NULL) {
      goto _exit;
    }

    // open
H
Hongze Cheng 已提交
387
    code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb);
H
Hongze Cheng 已提交
388 389
    if (code) goto _err;

H
Hongze Cheng 已提交
390
    // read index
H
Hongze Cheng 已提交
391
    code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx);
H
Hongze Cheng 已提交
392 393 394 395 396
    if (code) goto _err;

    pReader->iDelIdx = 0;
  }

H
Hongze Cheng 已提交
397 398 399 400 401 402
  while (true) {
    if (pReader->iDelIdx >= taosArrayGetSize(pReader->aDelIdx)) {
      tsdbDelFReaderClose(&pReader->pDelFReader);
      break;
    }

H
Hongze Cheng 已提交
403 404
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);

H
Hongze Cheng 已提交
405 406
    pReader->iDelIdx++;

H
Hongze Cheng 已提交
407
    code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData);
H
Hongze Cheng 已提交
408 409
    if (code) goto _err;

H
Hongze Cheng 已提交
410
    int32_t size = 0;
H
Hongze Cheng 已提交
411 412 413 414
    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

      if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
H
Hongze Cheng 已提交
415
        size += tPutDelData(NULL, pDelData);
H
Hongze Cheng 已提交
416 417
      }
    }
H
Hongze Cheng 已提交
418
    if (size == 0) continue;
H
Hongze Cheng 已提交
419

H
Hongze Cheng 已提交
420 421 422 423 424 425 426
    // org data
    size = sizeof(TABLEID) + size;
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
    if (*ppData == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
427

H
Hongze Cheng 已提交
428
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
C
Cary Xu 已提交
429
    pHdr->type = SNAP_DATA_DEL;
H
Hongze Cheng 已提交
430
    pHdr->size = size;
H
Hongze Cheng 已提交
431

H
Hongze Cheng 已提交
432 433 434 435 436 437
    TABLEID* pId = (TABLEID*)(&pHdr[1]);
    pId->suid = pDelIdx->suid;
    pId->uid = pDelIdx->uid;
    int32_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
H
Hongze Cheng 已提交
438

H
Hongze Cheng 已提交
439 440
      if (pDelData->version < pReader->sver) continue;
      if (pDelData->version > pReader->ever) continue;
H
Hongze Cheng 已提交
441

H
Hongze Cheng 已提交
442
      n += tPutDelData((*ppData) + n, pDelData);
H
Hongze Cheng 已提交
443 444
    }

H
Hongze Cheng 已提交
445
    tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%" PRId64 " size:%d",
C
Cary Xu 已提交
446
             TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size);
H
Hongze Cheng 已提交
447 448 449

    break;
  }
H
Hongze Cheng 已提交
450 451 452 453 454

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
455
  tsdbError("vgId:%d, vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
456
            tstrerror(code));
H
Hongze Cheng 已提交
457 458
  return code;
}
H
more  
Hongze Cheng 已提交
459

C
Cary Xu 已提交
460
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
461
  int32_t          code = 0;
H
Hongze Cheng 已提交
462
  int32_t          lino = 0;
H
Hongze Cheng 已提交
463
  STsdbSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
464

H
more  
Hongze Cheng 已提交
465
  // alloc
H
Hongze Cheng 已提交
466
  pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
more  
Hongze Cheng 已提交
467 468
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
469
    TSDB_CHECK_CODE(code, lino, _exit);
H
more  
Hongze Cheng 已提交
470 471 472 473
  }
  pReader->pTsdb = pTsdb;
  pReader->sver = sver;
  pReader->ever = ever;
C
Cary Xu 已提交
474
  pReader->type = type;
H
more  
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476 477 478
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
479
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
480 481 482 483 484
  }

  code = tsdbFSRef(pTsdb, &pReader->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
485
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
486 487 488 489 490
  }

  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
491
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
492 493
  }

H
Hongze Cheng 已提交
494
  // data
H
Hongze Cheng 已提交
495
  pReader->fid = INT32_MIN;
H
Hongze Cheng 已提交
496 497 498 499 500 501 502
  for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
    SFDataIter* pIter = &pReader->aFDataIter[iIter];

    if (iIter == 0) {
      pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
      if (pIter->aBlockIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
503
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
504 505 506 507 508
      }
    } else {
      pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
      if (pIter->aSttBlk == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
509
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
510 511 512 513
      }
    }

    code = tBlockDataCreate(&pIter->bData);
H
Hongze Cheng 已提交
514
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
515
  }
H
Hongze Cheng 已提交
516 517

  code = tBlockDataCreate(&pReader->bData);
H
Hongze Cheng 已提交
518
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
519

H
Hongze Cheng 已提交
520
  // del
H
Hongze Cheng 已提交
521 522 523
  pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pReader->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
524
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
525 526 527 528
  }
  pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pReader->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
529
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
530 531
  }

H
Hongze Cheng 已提交
532 533
_exit:
  if (code) {
S
Shengliang Guan 已提交
534
    tsdbError("vgId:%d, %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
535
              tstrerror(code), pTsdb->path);
H
Hongze Cheng 已提交
536 537 538 539 540 541 542 543 544 545 546 547 548
    *ppReader = NULL;

    if (pReader) {
      taosArrayDestroy(pReader->aDelData);
      taosArrayDestroy(pReader->aDelIdx);
      tBlockDataDestroy(&pReader->bData, 1);
      tsdbFSDestroy(&pReader->fs);
      taosMemoryFree(pReader);
    }
  } else {
    *ppReader = pReader;
    tsdbInfo("vgId:%d, vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
  }
H
more  
Hongze Cheng 已提交
549
  return code;
H
Hongze Cheng 已提交
550 551
}

H
Hongze Cheng 已提交
552
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
553
  int32_t          code = 0;
H
Hongze Cheng 已提交
554 555
  STsdbSnapReader* pReader = *ppReader;

H
Hongze Cheng 已提交
556 557 558 559 560 561 562 563 564 565 566 567 568
  // data
  if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader);
  for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) {
    SFDataIter* pIter = &pReader->aFDataIter[iIter];

    if (iIter == 0) {
      taosArrayDestroy(pIter->aBlockIdx);
      tMapDataClear(&pIter->mBlock);
    } else {
      taosArrayDestroy(pIter->aSttBlk);
    }

    tBlockDataDestroy(&pIter->bData, 1);
H
Hongze Cheng 已提交
569
  }
H
Hongze Cheng 已提交
570 571

  tBlockDataDestroy(&pReader->bData, 1);
572
  tDestroyTSchema(pReader->skmTable.pTSchema);
H
Hongze Cheng 已提交
573 574 575

  // del
  if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader);
H
Hongze Cheng 已提交
576 577 578
  taosArrayDestroy(pReader->aDelIdx);
  taosArrayDestroy(pReader->aDelData);

H
Hongze Cheng 已提交
579 580
  tsdbFSUnref(pReader->pTsdb, &pReader->fs);

S
Shengliang Guan 已提交
581
  tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
H
Hongze Cheng 已提交
582

H
Hongze Cheng 已提交
583 584 585 586
  for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
    tFree(pReader->aBuf[iBuf]);
  }

H
Hongze Cheng 已提交
587 588
  taosMemoryFree(pReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
589 590 591 592
  return code;
}

int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
H
more  
Hongze Cheng 已提交
593
  int32_t code = 0;
H
Hongze Cheng 已提交
594

H
Hongze Cheng 已提交
595 596
  *ppData = NULL;

H
Hongze Cheng 已提交
597
  // read data file
H
Hongze Cheng 已提交
598 599 600
  if (!pReader->dataDone) {
    code = tsdbSnapReadData(pReader, ppData);
    if (code) {
H
Hongze Cheng 已提交
601 602 603 604
      goto _err;
    } else {
      if (*ppData) {
        goto _exit;
H
Hongze Cheng 已提交
605
      } else {
H
Hongze Cheng 已提交
606
        pReader->dataDone = 1;
H
Hongze Cheng 已提交
607 608 609
      }
    }
  }
H
Hongze Cheng 已提交
610 611

  // read del file
H
Hongze Cheng 已提交
612 613 614
  if (!pReader->delDone) {
    code = tsdbSnapReadDel(pReader, ppData);
    if (code) {
H
Hongze Cheng 已提交
615 616 617 618
      goto _err;
    } else {
      if (*ppData) {
        goto _exit;
H
Hongze Cheng 已提交
619
      } else {
H
Hongze Cheng 已提交
620
        pReader->delDone = 1;
H
Hongze Cheng 已提交
621 622 623
      }
    }
  }
H
Hongze Cheng 已提交
624

H
Hongze Cheng 已提交
625
_exit:
S
Shengliang Guan 已提交
626
  tsdbDebug("vgId:%d, vnode snapshot tsdb read for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
H
more  
Hongze Cheng 已提交
627 628 629
  return code;

_err:
S
Shengliang Guan 已提交
630
  tsdbError("vgId:%d, vnode snapshot tsdb read for %s failed since %s", TD_VID(pReader->pTsdb->pVnode),
C
Cary Xu 已提交
631
            pReader->pTsdb->path, tstrerror(code));
H
more  
Hongze Cheng 已提交
632 633 634
  return code;
}

H
Hongze Cheng 已提交
635
// STsdbSnapWriter ========================================
H
Hongze Cheng 已提交
636 637 638 639
struct STsdbSnapWriter {
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
640
  STsdbFS fs;
H
Hongze Cheng 已提交
641

H
Hongze Cheng 已提交
642
  // config
H
Hongze Cheng 已提交
643 644 645 646 647 648
  int32_t  minutes;
  int8_t   precision;
  int32_t  minRow;
  int32_t  maxRow;
  int8_t   cmprAlg;
  int64_t  commitID;
H
Hongze Cheng 已提交
649
  uint8_t* aBuf[5];
H
Hongze Cheng 已提交
650

H
Hongze Cheng 已提交
651
  // for data file
H
Hongze Cheng 已提交
652
  SBlockData bData;
H
Hongze Cheng 已提交
653 654 655
  int32_t    fid;
  TABLEID    id;
  SSkmInfo   skmTable;
H
Hongze Cheng 已提交
656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673
  struct {
    SDataFReader* pReader;
    SArray*       aBlockIdx;
    int32_t       iBlockIdx;
    SBlockIdx*    pBlockIdx;
    SMapData      mDataBlk;
    int32_t       iDataBlk;
    SBlockData    bData;
    int32_t       iRow;
  } dReader;
  struct {
    SDataFWriter* pWriter;
    SArray*       aBlockIdx;
    SMapData      mDataBlk;
    SArray*       aSttBlk;
    SBlockData    bData;
    SBlockData    sData;
  } dWriter;
H
Hongze Cheng 已提交
674

H
Hongze Cheng 已提交
675
  // for del file
H
Hongze Cheng 已提交
676
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
677
  SDelFWriter* pDelFWriter;
H
Hongze Cheng 已提交
678
  int32_t      iDelIdx;
H
Hongze Cheng 已提交
679
  SArray*      aDelIdxR;
H
Hongze Cheng 已提交
680
  SArray*      aDelData;
H
Hongze Cheng 已提交
681
  SArray*      aDelIdxW;
H
Hongze Cheng 已提交
682 683
};

H
Hongze Cheng 已提交
684
// SNAP_DATA_TSDB
H
Hongze Cheng 已提交
685 686 687
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);

H
Hongze Cheng 已提交
688
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
689
  int32_t code = 0;
H
Hongze Cheng 已提交
690

691
  ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow);
H
Hongze Cheng 已提交
692

H
Hongze Cheng 已提交
693 694 695 696 697 698
  if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
    pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx);

    code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk);
    if (code) goto _exit;

H
Hongze Cheng 已提交
699
    pWriter->dReader.iBlockIdx++;
H
Hongze Cheng 已提交
700 701
  } else {
    pWriter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
702
    tMapDataReset(&pWriter->dReader.mDataBlk);
H
Hongze Cheng 已提交
703
  }
H
Hongze Cheng 已提交
704 705 706
  pWriter->dReader.iDataBlk = 0;  // point to the next one
  tBlockDataReset(&pWriter->dReader.bData);
  pWriter->dReader.iRow = 0;
H
Hongze Cheng 已提交
707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729

_exit:
  return code;
}

static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
  int32_t code = 0;

  while (true) {
    if (pWriter->dReader.pBlockIdx == NULL) break;
    if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;

    SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx;
    code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx);
    if (code) goto _exit;

    if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    code = tsdbSnapNextTableData(pWriter);
    if (code) goto _exit;
H
Hongze Cheng 已提交
730 731
  }

H
Hongze Cheng 已提交
732 733 734 735
_exit:
  return code;
}

H
Hongze Cheng 已提交
736 737
static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
  int32_t code = 0;
H
Hongze Cheng 已提交
738 739 740 741 742 743 744 745 746 747 748

  code = tsdbSnapWriteCopyData(pWriter, pId);
  if (code) goto _err;

  pWriter->id.suid = pId->suid;
  pWriter->id.uid = pId->uid;

  code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
  if (code) goto _err;

  tMapDataReset(&pWriter->dWriter.mDataBlk);
H
Hongze Cheng 已提交
749
  code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
750 751 752 753 754
  if (code) goto _err;

  return code;

_err:
S
Shengliang Guan 已提交
755
  tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
H
Hongze Cheng 已提交
756 757 758 759 760 761 762 763
  return code;
}

static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;

  if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code;

H
Hongze Cheng 已提交
764 765 766
  int32_t c = 1;
  if (pWriter->dReader.pBlockIdx) {
    c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id);
767
    ASSERT(c >= 0);
H
Hongze Cheng 已提交
768 769 770 771 772
  }

  if (c == 0) {
    SBlockData* pBData = &pWriter->dWriter.bData;

H
Hongze Cheng 已提交
773 774
    for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
      TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
H
Hongze Cheng 已提交
775 776

      code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid);
H
Hongze Cheng 已提交
777 778
      if (code) goto _err;

H
Hongze Cheng 已提交
779 780
      if (pBData->nRow >= pWriter->maxRow) {
        code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
H
Hongze Cheng 已提交
781 782 783 784
        if (code) goto _err;
      }
    }

H
Hongze Cheng 已提交
785
    code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg);
H
Hongze Cheng 已提交
786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
    if (code) goto _err;

    for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
      SDataBlk dataBlk;
      tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);

      code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
      if (code) goto _err;
    }

    code = tsdbSnapNextTableData(pWriter);
    if (code) goto _err;
  }

  if (pWriter->dWriter.mDataBlk.nItem) {
    SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid};
    code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx);

    if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
  }

H
Hongze Cheng 已提交
810 811 812
  pWriter->id.suid = 0;
  pWriter->id.uid = 0;

H
Hongze Cheng 已提交
813 814 815 816 817 818
  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
819 820 821 822
static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) {
  int32_t code = 0;
  STsdb*  pTsdb = pWriter->pTsdb;

823
  ASSERT(pWriter->dWriter.pWriter == NULL);
H
Hongze Cheng 已提交
824 825

  pWriter->fid = fid;
H
Hongze Cheng 已提交
826
  pWriter->id = (TABLEID){0};
H
Hongze Cheng 已提交
827 828
  SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);

H
Hongze Cheng 已提交
829
  // Reader
H
Hongze Cheng 已提交
830 831 832 833 834 835 836
  if (pSet) {
    code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet);
    if (code) goto _err;

    code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx);
    if (code) goto _err;
  } else {
837
    ASSERT(pWriter->dReader.pReader == NULL);
H
Hongze Cheng 已提交
838
    taosArrayClear(pWriter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
839
  }
H
Hongze Cheng 已提交
840 841 842
  pWriter->dReader.iBlockIdx = 0;  // point to the next one
  code = tsdbSnapNextTableData(pWriter);
  if (code) goto _err;
H
Hongze Cheng 已提交
843

H
Hongze Cheng 已提交
844
  // Writer
H
Hongze Cheng 已提交
845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
  SHeadFile fHead = {.commitID = pWriter->commitID};
  SDataFile fData = {.commitID = pWriter->commitID};
  SSmaFile  fSma = {.commitID = pWriter->commitID};
  SSttFile  fStt = {.commitID = pWriter->commitID};
  SDFileSet wSet = {.fid = pWriter->fid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
  if (pSet) {
    wSet.diskId = pSet->diskId;
    fData = *pSet->pDataF;
    fSma = *pSet->pSmaF;
    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
      wSet.aSttF[iStt] = pSet->aSttF[iStt];
    }
    wSet.nSttF = pSet->nSttF + 1;  // TODO: fix pSet->nSttF == pTsdb->maxFile
  } else {
    SDiskID did = {0};
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
    wSet.diskId = did;
    wSet.nSttF = 1;
  }
  wSet.aSttF[wSet.nSttF - 1] = &fStt;
H
Hongze Cheng 已提交
866

H
Hongze Cheng 已提交
867
  code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet);
H
Hongze Cheng 已提交
868
  if (code) goto _err;
H
Hongze Cheng 已提交
869 870 871
  taosArrayClear(pWriter->dWriter.aBlockIdx);
  tMapDataReset(&pWriter->dWriter.mDataBlk);
  taosArrayClear(pWriter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
872 873
  tBlockDataReset(&pWriter->dWriter.bData);
  tBlockDataReset(&pWriter->dWriter.sData);
H
Hongze Cheng 已提交
874 875 876 877 878 879 880 881 882 883

  return code;

_err:
  return code;
}

static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) {
  int32_t code = 0;

884
  ASSERT(pWriter->dWriter.pWriter);
H
Hongze Cheng 已提交
885

H
Hongze Cheng 已提交
886 887
  code = tsdbSnapWriteTableDataEnd(pWriter);
  if (code) goto _err;
H
Hongze Cheng 已提交
888

H
Hongze Cheng 已提交
889 890 891
  // copy remain table data
  TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
  code = tsdbSnapWriteCopyData(pWriter, &id);
H
Hongze Cheng 已提交
892
  if (code) goto _err;
H
Hongze Cheng 已提交
893

H
Hongze Cheng 已提交
894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
  code =
      tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
  if (code) goto _err;

  // Indices
  code = tsdbWriteBlockIdx(pWriter->dWriter.pWriter, pWriter->dWriter.aBlockIdx);
  if (code) goto _err;

  code = tsdbWriteSttBlk(pWriter->dWriter.pWriter, pWriter->dWriter.aSttBlk);
  if (code) goto _err;

  code = tsdbUpdateDFileSetHeader(pWriter->dWriter.pWriter);
  if (code) goto _err;

  code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->dWriter.pWriter->wSet);
  if (code) goto _err;

  code = tsdbDataFWriterClose(&pWriter->dWriter.pWriter, 1);
  if (code) goto _err;

  if (pWriter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pWriter->dReader.pReader);
    if (code) goto _err;
H
Hongze Cheng 已提交
917 918 919 920
  }

_exit:
  return code;
H
Hongze Cheng 已提交
921 922 923

_err:
  return code;
H
Hongze Cheng 已提交
924 925
}

H
Hongze Cheng 已提交
926
static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
H
Hongze Cheng 已提交
927 928
  int32_t code = 0;

H
Hongze Cheng 已提交
929 930 931
  SBlockData* pBData = &pWriter->bData;
  TABLEID     id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]};
  TSDBROW     row = tsdbRowFromBlockData(pBData, iRow);
H
Hongze Cheng 已提交
932 933
  TSDBKEY     key = TSDBROW_KEY(&row);

H
Hongze Cheng 已提交
934 935 936 937
  *done = 0;
  while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow ||
         pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) {
    // Merge row by row
H
Hongze Cheng 已提交
938 939 940
    for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) {
      TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow);
      TSDBKEY tKey = TSDBROW_KEY(&trow);
H
Hongze Cheng 已提交
941

942
      ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid);
H
Hongze Cheng 已提交
943

H
Hongze Cheng 已提交
944 945 946 947 948 949 950 951
      int32_t c = tsdbKeyCmprFn(&key, &tKey);
      if (c < 0) {
        code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
        if (code) goto _err;
      } else if (c > 0) {
        code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid);
        if (code) goto _err;
      } else {
952
        ASSERT(0);
H
Hongze Cheng 已提交
953
      }
H
Hongze Cheng 已提交
954

H
Hongze Cheng 已提交
955
      if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
H
Hongze Cheng 已提交
956 957 958
        code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
                                  pWriter->cmprAlg);
        if (code) goto _err;
H
Hongze Cheng 已提交
959 960
      }

H
Hongze Cheng 已提交
961 962 963 964
      if (c < 0) {
        *done = 1;
        goto _exit;
      }
H
Hongze Cheng 已提交
965
    }
H
Hongze Cheng 已提交
966

H
Hongze Cheng 已提交
967
    // Merge row by block
H
Hongze Cheng 已提交
968
    SDataBlk tDataBlk = {.minKey = key, .maxKey = key};
H
Hongze Cheng 已提交
969
    for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) {
H
Hongze Cheng 已提交
970
      SDataBlk dataBlk;
H
Hongze Cheng 已提交
971
      tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
972 973 974

      int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk);
      if (c < 0) {
H
Hongze Cheng 已提交
975 976 977 978
        code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
                                  pWriter->cmprAlg);
        if (code) goto _err;

H
Hongze Cheng 已提交
979 980
        code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
        if (code) goto _err;
H
Hongze Cheng 已提交
981
      } else if (c > 0) {
H
Hongze Cheng 已提交
982 983 984 985
        code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
        if (code) goto _err;

        if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
H
Hongze Cheng 已提交
986 987 988
          code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
                                    pWriter->cmprAlg);
          if (code) goto _err;
H
Hongze Cheng 已提交
989
        }
H
Hongze Cheng 已提交
990 991 992

        *done = 1;
        goto _exit;
H
Hongze Cheng 已提交
993 994 995
      } else {
        code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
        if (code) goto _err;
H
Hongze Cheng 已提交
996
        pWriter->dReader.iRow = 0;
H
Hongze Cheng 已提交
997

H
Hongze Cheng 已提交
998 999
        pWriter->dReader.iDataBlk++;
        break;
H
Hongze Cheng 已提交
1000
      }
H
Hongze Cheng 已提交
1001
    }
H
Hongze Cheng 已提交
1002
  }
H
Hongze Cheng 已提交
1003

H
Hongze Cheng 已提交
1004 1005 1006 1007
_exit:
  return code;

_err:
S
Shengliang Guan 已提交
1008
  tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
H
Hongze Cheng 已提交
1009 1010 1011
  return code;
}

H
Hongze Cheng 已提交
1012
static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
H
Hongze Cheng 已提交
1013 1014
  int32_t code = 0;

H
Hongze Cheng 已提交
1015 1016 1017 1018
  TABLEID     id = {.suid = pWriter->bData.suid,
                    .uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]};
  TSDBROW     row = tsdbRowFromBlockData(&pWriter->bData, iRow);
  SBlockData* pBData = &pWriter->dWriter.sData;
H
Hongze Cheng 已提交
1019

H
Hongze Cheng 已提交
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
  if (pBData->suid || pBData->uid) {
    if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
      code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
      if (code) goto _err;

      pBData->suid = 0;
      pBData->uid = 0;
    }
  }

  if (pBData->suid == 0 && pBData->uid == 0) {
    code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
    if (code) goto _err;

H
Hongze Cheng 已提交
1034 1035
    TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid};
    code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1036 1037 1038 1039
    if (code) goto _err;
  }

  code = tBlockDataAppendRow(pBData, &row, NULL, id.uid);
H
Hongze Cheng 已提交
1040 1041
  if (code) goto _err;

H
Hongze Cheng 已提交
1042 1043
  if (pBData->nRow >= pWriter->maxRow) {
    code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
H
Hongze Cheng 已提交
1044
    if (code) goto _err;
H
Hongze Cheng 已提交
1045 1046
  }

H
Hongze Cheng 已提交
1047 1048 1049 1050 1051 1052 1053
_exit:
  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
1054 1055 1056 1057 1058 1059 1060
static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {
  int32_t code = 0;

  SBlockData* pBlockData = &pWriter->bData;
  TABLEID     id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]};

  // End last table data write if need
H
Hongze Cheng 已提交
1061
  if (tTABLEIDCmprFn(&pWriter->id, &id) != 0) {
H
Hongze Cheng 已提交
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
    code = tsdbSnapWriteTableDataEnd(pWriter);
    if (code) goto _err;
  }

  // Start new table data write if need
  if (pWriter->id.suid == 0 && pWriter->id.uid == 0) {
    code = tsdbSnapWriteTableDataStart(pWriter, &id);
    if (code) goto _err;
  }

  // Merge with .data file data
  int8_t done = 0;
H
Hongze Cheng 已提交
1074 1075 1076 1077
  if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) {
    code = tsdbSnapWriteToDataFile(pWriter, iRow, &done);
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
1078 1079 1080

  // Append to the .stt data block (todo: check if need to set/reload sst block)
  if (!done) {
H
Hongze Cheng 已提交
1081
    code = tsdbSnapWriteToSttFile(pWriter, iRow);
H
Hongze Cheng 已提交
1082 1083
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
1084

H
Hongze Cheng 已提交
1085 1086 1087 1088
_exit:
  return code;

_err:
S
Shengliang Guan 已提交
1089
  tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
H
Hongze Cheng 已提交
1090 1091 1092
  return code;
}

H
Hongze Cheng 已提交
1093 1094 1095 1096 1097 1098 1099 1100
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
  int32_t     code = 0;
  STsdb*      pTsdb = pWriter->pTsdb;
  SBlockData* pBlockData = &pWriter->bData;

  // Decode data
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
  code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf);
H
Hongze Cheng 已提交
1101
  if (code) goto _err;
H
Hongze Cheng 已提交
1102

1103
  ASSERT(pBlockData->nRow > 0);
H
Hongze Cheng 已提交
1104

H
Hongze Cheng 已提交
1105 1106 1107 1108
  // Loop to handle each row
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
    TSKEY   ts = pBlockData->aTSKEY[iRow];
    int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision);
H
Hongze Cheng 已提交
1109

H
Hongze Cheng 已提交
1110
    if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) {
H
Hongze Cheng 已提交
1111
      if (pWriter->dWriter.pWriter) {
1112
        ASSERT(fid > pWriter->fid);
H
Hongze Cheng 已提交
1113 1114 1115 1116 1117

        code = tsdbSnapWriteCloseFile(pWriter);
        if (code) goto _err;
      }

H
Hongze Cheng 已提交
1118 1119 1120 1121 1122 1123 1124 1125
      code = tsdbSnapWriteOpenFile(pWriter, fid);
      if (code) goto _err;
    }

    code = tsdbSnapWriteRowData(pWriter, iRow);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
1126 1127 1128
  return code;

_err:
S
Shengliang Guan 已提交
1129
  tsdbError("vgId:%d, vnode snapshot tsdb write data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
1130
            tstrerror(code));
H
Hongze Cheng 已提交
1131 1132 1133
  return code;
}

H
Hongze Cheng 已提交
1134
// SNAP_DATA_DEL
H
Hongze Cheng 已提交
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) {
  int32_t code = 0;

  while (true) {
    if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break;

    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);

    if (tTABLEIDCmprFn(pDelIdx, pId) >= 0) break;

    code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
    if (code) goto _exit;

    SDelIdx delIdx = *pDelIdx;
    code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
    if (code) goto _exit;

    if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    pWriter->iDelIdx++;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
1164 1165
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1166 1167
  STsdb*  pTsdb = pWriter->pTsdb;

H
Hongze Cheng 已提交
1168
  // Open del file if not opened yet
H
Hongze Cheng 已提交
1169
  if (pWriter->pDelFWriter == NULL) {
H
Hongze Cheng 已提交
1170
    SDelFile* pDelFile = pWriter->fs.pDelFile;
H
Hongze Cheng 已提交
1171 1172 1173

    // reader
    if (pDelFile) {
H
Hongze Cheng 已提交
1174
      code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb);
H
Hongze Cheng 已提交
1175 1176
      if (code) goto _err;

H
Hongze Cheng 已提交
1177
      code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
H
Hongze Cheng 已提交
1178
      if (code) goto _err;
H
Hongze Cheng 已提交
1179 1180
    } else {
      taosArrayClear(pWriter->aDelIdxR);
H
Hongze Cheng 已提交
1181
    }
H
Hongze Cheng 已提交
1182
    pWriter->iDelIdx = 0;
H
Hongze Cheng 已提交
1183 1184

    // writer
H
Hongze Cheng 已提交
1185
    SDelFile delFile = {.commitID = pWriter->commitID};
H
Hongze Cheng 已提交
1186 1187
    code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
    if (code) goto _err;
H
Hongze Cheng 已提交
1188
    taosArrayClear(pWriter->aDelIdxW);
H
Hongze Cheng 已提交
1189 1190
  }

H
Hongze Cheng 已提交
1191 1192
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
  TABLEID       id = *(TABLEID*)pHdr->data;
H
Hongze Cheng 已提交
1193

1194
  ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
H
Hongze Cheng 已提交
1195

H
Hongze Cheng 已提交
1196 1197 1198
  // Move write data < id
  code = tsdbSnapMoveWriteDelData(pWriter, &id);
  if (code) goto _err;
H
Hongze Cheng 已提交
1199

H
Hongze Cheng 已提交
1200
  // Merge incoming data with current
H
Hongze Cheng 已提交
1201 1202 1203
  if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) &&
      tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) {
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
H
Hongze Cheng 已提交
1204

H
Hongze Cheng 已提交
1205
    code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
H
Hongze Cheng 已提交
1206 1207
    if (code) goto _err;

H
Hongze Cheng 已提交
1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219
    pWriter->iDelIdx++;
  } else {
    taosArrayClear(pWriter->aDelData);
  }

  int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
  while (n < nData) {
    SDelData delData;

    n += tGetDelData(pData + n, &delData);

    if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
H
Hongze Cheng 已提交
1220 1221 1222
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
1223
  }
H
Hongze Cheng 已提交
1224

H
Hongze Cheng 已提交
1225 1226 1227 1228 1229 1230 1231
  SDelIdx delIdx = {.suid = id.suid, .uid = id.uid};
  code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
  if (code) goto _err;

  if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
1232 1233 1234 1235 1236
  }

  return code;

_err:
S
Shengliang Guan 已提交
1237
  tsdbError("vgId:%d, vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
1238
            tstrerror(code));
H
Hongze Cheng 已提交
1239 1240 1241
  return code;
}

H
Hongze Cheng 已提交
1242 1243
static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1244 1245
  STsdb*  pTsdb = pWriter->pTsdb;

H
Hongze Cheng 已提交
1246
  if (pWriter->pDelFWriter == NULL) return code;
H
Hongze Cheng 已提交
1247

H
Hongze Cheng 已提交
1248 1249 1250
  TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX};
  code = tsdbSnapMoveWriteDelData(pWriter, &id);
  if (code) goto _err;
H
Hongze Cheng 已提交
1251

H
Hongze Cheng 已提交
1252 1253 1254
  code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW);
  if (code) goto _err;

H
Hongze Cheng 已提交
1255 1256 1257
  code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
1258
  code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
1259 1260 1261 1262 1263 1264 1265 1266 1267 1268
  if (code) goto _err;

  code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
  if (code) goto _err;

  if (pWriter->pDelFReader) {
    code = tsdbDelFReaderClose(&pWriter->pDelFReader);
    if (code) goto _err;
  }

S
Shengliang Guan 已提交
1269
  tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
H
Hongze Cheng 已提交
1270 1271 1272
  return code;

_err:
S
Shengliang Guan 已提交
1273
  tsdbError("vgId:%d, vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
1274
            tstrerror(code));
H
Hongze Cheng 已提交
1275 1276 1277
  return code;
}

H
Hongze Cheng 已提交
1278
// APIs
H
Hongze Cheng 已提交
1279
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
H
Hongze Cheng 已提交
1280
  int32_t          code = 0;
H
Hongze Cheng 已提交
1281
  int32_t          lino = 0;
H
Hongze Cheng 已提交
1282 1283 1284 1285 1286 1287
  STsdbSnapWriter* pWriter = NULL;

  // alloc
  pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1288
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1289 1290 1291 1292 1293
  }
  pWriter->pTsdb = pTsdb;
  pWriter->sver = sver;
  pWriter->ever = ever;

H
Hongze Cheng 已提交
1294
  code = tsdbFSCopy(pTsdb, &pWriter->fs);
H
Hongze Cheng 已提交
1295
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1296

H
Hongze Cheng 已提交
1297
  // config
H
Hongze Cheng 已提交
1298 1299 1300 1301 1302
  pWriter->minutes = pTsdb->keepCfg.days;
  pWriter->precision = pTsdb->keepCfg.precision;
  pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
1303 1304
  pWriter->commitID = pTsdb->pVnode->state.commitID;

H
Hongze Cheng 已提交
1305
  // SNAP_DATA_TSDB
H
Hongze Cheng 已提交
1306
  code = tBlockDataCreate(&pWriter->bData);
H
Hongze Cheng 已提交
1307
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1308

H
Hongze Cheng 已提交
1309 1310 1311 1312 1313
  pWriter->fid = INT32_MIN;
  pWriter->id = (TABLEID){0};
  // Reader
  pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pWriter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
1314
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1315
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1316
  }
H
Hongze Cheng 已提交
1317
  code = tBlockDataCreate(&pWriter->dReader.bData);
H
Hongze Cheng 已提交
1318
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1319

H
Hongze Cheng 已提交
1320 1321 1322
  // Writer
  pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pWriter->dWriter.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
1323
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1324
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1325
  }
H
Hongze Cheng 已提交
1326 1327
  pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pWriter->dWriter.aSttBlk == NULL) {
H
Hongze Cheng 已提交
1328
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1329
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1330
  }
H
Hongze Cheng 已提交
1331
  code = tBlockDataCreate(&pWriter->dWriter.bData);
H
Hongze Cheng 已提交
1332
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1333
  code = tBlockDataCreate(&pWriter->dWriter.sData);
H
Hongze Cheng 已提交
1334
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1335

H
Hongze Cheng 已提交
1336
  // SNAP_DATA_DEL
H
Hongze Cheng 已提交
1337 1338 1339
  pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
  if (pWriter->aDelIdxR == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1340
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1341 1342 1343 1344
  }
  pWriter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pWriter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1345
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1346 1347 1348 1349
  }
  pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx));
  if (pWriter->aDelIdxW == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1350
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1351
  }
H
Hongze Cheng 已提交
1352

H
Hongze Cheng 已提交
1353 1354
_exit:
  if (code) {
S
Shengliang Guan 已提交
1355
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372
    *ppWriter = NULL;

    if (pWriter) {
      if (pWriter->aDelIdxW) taosArrayDestroy(pWriter->aDelIdxW);
      if (pWriter->aDelData) taosArrayDestroy(pWriter->aDelData);
      if (pWriter->aDelIdxR) taosArrayDestroy(pWriter->aDelIdxR);
      tBlockDataDestroy(&pWriter->dWriter.sData, 1);
      tBlockDataDestroy(&pWriter->dWriter.bData, 1);
      if (pWriter->dWriter.aSttBlk) taosArrayDestroy(pWriter->dWriter.aSttBlk);
      if (pWriter->dWriter.aBlockIdx) taosArrayDestroy(pWriter->dWriter.aBlockIdx);
      tBlockDataDestroy(&pWriter->dReader.bData, 1);
      if (pWriter->dReader.aBlockIdx) taosArrayDestroy(pWriter->dReader.aBlockIdx);
      tBlockDataDestroy(&pWriter->bData, 1);
      tsdbFSDestroy(&pWriter->fs);
      taosMemoryFree(pWriter);
    }
  } else {
H
Hongze Cheng 已提交
1373
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
1374 1375
    *ppWriter = pWriter;
  }
H
Hongze Cheng 已提交
1376 1377 1378
  return code;
}

H
Hongze Cheng 已提交
1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
  if (pWriter->dWriter.pWriter) {
    code = tsdbSnapWriteCloseFile(pWriter);
    if (code) goto _exit;
  }

  code = tsdbSnapWriteDelEnd(pWriter);
  if (code) goto _exit;

  code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
  if (code) goto _exit;

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
1399
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
H
Hongze Cheng 已提交
1400
  int32_t          code = 0;
H
Hongze Cheng 已提交
1401
  STsdbSnapWriter* pWriter = *ppWriter;
H
Hongze Cheng 已提交
1402
  STsdb*           pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
1403 1404

  if (rollback) {
H
Hongze Cheng 已提交
1405
    tsdbRollbackCommit(pWriter->pTsdb);
H
Hongze Cheng 已提交
1406
  } else {
H
Hongze Cheng 已提交
1407 1408 1409
    // lock
    taosThreadRwlockWrlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
1410
    code = tsdbFSCommit(pWriter->pTsdb);
H
Hongze Cheng 已提交
1411 1412 1413 1414 1415 1416 1417
    if (code) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
      goto _err;
    }

    // unlock
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1418 1419
  }

H
Hongze Cheng 已提交
1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439
  // SNAP_DATA_DEL
  taosArrayDestroy(pWriter->aDelIdxW);
  taosArrayDestroy(pWriter->aDelData);
  taosArrayDestroy(pWriter->aDelIdxR);

  // SNAP_DATA_TSDB

  // Writer
  tBlockDataDestroy(&pWriter->dWriter.sData, 1);
  tBlockDataDestroy(&pWriter->dWriter.bData, 1);
  taosArrayDestroy(pWriter->dWriter.aSttBlk);
  tMapDataClear(&pWriter->dWriter.mDataBlk);
  taosArrayDestroy(pWriter->dWriter.aBlockIdx);

  // Reader
  tBlockDataDestroy(&pWriter->dReader.bData, 1);
  tMapDataClear(&pWriter->dReader.mDataBlk);
  taosArrayDestroy(pWriter->dReader.aBlockIdx);

  tBlockDataDestroy(&pWriter->bData, 1);
1440
  tDestroyTSchema(pWriter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1441

H
Hongze Cheng 已提交
1442 1443 1444
  for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
    tFree(pWriter->aBuf[iBuf]);
  }
H
Hongze Cheng 已提交
1445
  tsdbInfo("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
1446 1447 1448 1449 1450
  taosMemoryFree(pWriter);
  *ppWriter = NULL;
  return code;

_err:
S
Shengliang Guan 已提交
1451
  tsdbError("vgId:%d, vnode snapshot tsdb writer close for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
1452 1453 1454
            pWriter->pTsdb->path, tstrerror(code));
  taosMemoryFree(pWriter);
  *ppWriter = NULL;
H
Hongze Cheng 已提交
1455 1456 1457 1458
  return code;
}

int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
Hongze Cheng 已提交
1459
  int32_t       code = 0;
H
Hongze Cheng 已提交
1460
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
H
Hongze Cheng 已提交
1461 1462

  // ts data
C
Cary Xu 已提交
1463
  if (pHdr->type == SNAP_DATA_TSDB) {
H
Hongze Cheng 已提交
1464
    code = tsdbSnapWriteData(pWriter, pData, nData);
H
Hongze Cheng 已提交
1465
    if (code) goto _err;
H
Hongze Cheng 已提交
1466 1467

    goto _exit;
H
Hongze Cheng 已提交
1468
  } else {
H
Hongze Cheng 已提交
1469
    if (pWriter->dWriter.pWriter) {
H
Hongze Cheng 已提交
1470
      code = tsdbSnapWriteCloseFile(pWriter);
H
Hongze Cheng 已提交
1471 1472
      if (code) goto _err;
    }
H
Hongze Cheng 已提交
1473 1474 1475
  }

  // del data
C
Cary Xu 已提交
1476
  if (pHdr->type == SNAP_DATA_DEL) {
H
Hongze Cheng 已提交
1477
    code = tsdbSnapWriteDel(pWriter, pData, nData);
H
Hongze Cheng 已提交
1478 1479 1480
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
1481
_exit:
C
Cary Xu 已提交
1482
  tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
H
Hongze Cheng 已提交
1483 1484 1485
  return code;

_err:
C
Cary Xu 已提交
1486
  tsdbError("vgId:%d, tsdb snapshot write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path,
C
Cary Xu 已提交
1487
            tstrerror(code));
H
Hongze Cheng 已提交
1488 1489
  return code;
}