tsdbSnapshot.c 35.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18
// STsdbSnapReader ========================================
H
Hongze Cheng 已提交
19
struct STsdbSnapReader {
H
more  
Hongze Cheng 已提交
20 21 22
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
23
  STsdbFS fs;
C
Cary Xu 已提交
24
  int8_t  type;
H
more  
Hongze Cheng 已提交
25
  // for data file
H
Hongze Cheng 已提交
26 27
  int8_t        dataDone;
  int32_t       fid;
H
more  
Hongze Cheng 已提交
28
  SDataFReader* pDataFReader;
H
Hongze Cheng 已提交
29
  SArray*       aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
30 31
  int32_t       iBlockIdx;
  SBlockIdx*    pBlockIdx;
H
Hongze Cheng 已提交
32
  SMapData      mBlock;  // SMapData<SBlock>
H
Hongze Cheng 已提交
33
  int32_t       iBlock;
H
Hongze Cheng 已提交
34 35
  SBlockData    oBlockData;
  SBlockData    nBlockData;
H
more  
Hongze Cheng 已提交
36
  // for del file
H
Hongze Cheng 已提交
37
  int8_t       delDone;
H
more  
Hongze Cheng 已提交
38
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
39
  SArray*      aDelIdx;  // SArray<SDelIdx>
H
Hongze Cheng 已提交
40 41
  int32_t      iDelIdx;
  SArray*      aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
42 43
};

H
Hongze Cheng 已提交
44 45
static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
46
#if 0
H
Hongze Cheng 已提交
47
  STsdb*  pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
48

H
Hongze Cheng 已提交
49 50
  while (true) {
    if (pReader->pDataFReader == NULL) {
H
Hongze Cheng 已提交
51 52
      SDFileSet* pSet =
          taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT);
H
Hongze Cheng 已提交
53

H
Hongze Cheng 已提交
54
      if (pSet == NULL) goto _exit;
H
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56
      pReader->fid = pSet->fid;
H
Hongze Cheng 已提交
57 58 59 60
      code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
      if (code) goto _err;

      // SBlockIdx
H
Hongze Cheng 已提交
61
      code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
H
Hongze Cheng 已提交
62
      if (code) goto _err;
H
Hongze Cheng 已提交
63

H
Hongze Cheng 已提交
64 65
      pReader->iBlockIdx = 0;
      pReader->pBlockIdx = NULL;
H
Hongze Cheng 已提交
66

S
Shengliang Guan 已提交
67
      tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
68
               pReader->fid);
H
Hongze Cheng 已提交
69
    }
H
Hongze Cheng 已提交
70

H
Hongze Cheng 已提交
71 72 73 74 75 76
    while (true) {
      if (pReader->pBlockIdx == NULL) {
        if (pReader->iBlockIdx >= taosArrayGetSize(pReader->aBlockIdx)) {
          tsdbDataFReaderClose(&pReader->pDataFReader);
          break;
        }
H
Hongze Cheng 已提交
77

H
Hongze Cheng 已提交
78 79
        pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx);
        pReader->iBlockIdx++;
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81
        code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock);
H
Hongze Cheng 已提交
82 83 84 85 86
        if (code) goto _err;

        pReader->iBlock = 0;
      }

H
Hongze Cheng 已提交
87 88
      SBlock  block;
      SBlock* pBlock = &block;
H
Hongze Cheng 已提交
89 90 91 92 93 94 95 96
      while (true) {
        if (pReader->iBlock >= pReader->mBlock.nItem) {
          pReader->pBlockIdx = NULL;
          break;
        }

        tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock);
        pReader->iBlock++;
H
Hongze Cheng 已提交
97

H
Hongze Cheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
        if (pBlock->minVersion > pReader->ever || pBlock->maxVersion < pReader->sver) continue;

        code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->oBlockData, NULL, NULL);
        if (code) goto _err;

        // filter
        tBlockDataReset(&pReader->nBlockData);
        for (int32_t iColData = 0; iColData < taosArrayGetSize(pReader->oBlockData.aIdx); iColData++) {
          SColData* pColDataO = tBlockDataGetColDataByIdx(&pReader->oBlockData, iColData);
          SColData* pColDataN = NULL;

          code = tBlockDataAddColData(&pReader->nBlockData, taosArrayGetSize(pReader->nBlockData.aIdx), &pColDataN);
          if (code) goto _err;

          tColDataInit(pColDataN, pColDataO->cid, pColDataO->type, pColDataO->smaOn);
        }

        for (int32_t iRow = 0; iRow < pReader->oBlockData.nRow; iRow++) {
          TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow);
          int64_t version = TSDBROW_VERSION(&row);

119 120 121
          tsdbTrace("vgId:%d, vnode snapshot tsdb read for %s, %" PRId64 "(%" PRId64 " , %" PRId64 ")",
                    TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path, version, pReader->sver, pReader->ever);

H
Hongze Cheng 已提交
122
          if (version < pReader->sver || version > pReader->ever) continue;
H
Hongze Cheng 已提交
123

H
Hongze Cheng 已提交
124
          code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL);
H
Hongze Cheng 已提交
125
          if (code) goto _err;
H
Hongze Cheng 已提交
126
        }
H
Hongze Cheng 已提交
127

128 129 130 131
        if (pReader->nBlockData.nRow <= 0) {
          continue;
        }

H
Hongze Cheng 已提交
132 133
        // org data
        // compress data (todo)
H
Hongze Cheng 已提交
134 135 136 137 138 139
        int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData);

        *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
        if (*ppData == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
H
Hongze Cheng 已提交
140
        }
H
Hongze Cheng 已提交
141 142

        SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
C
Cary Xu 已提交
143
        pHdr->type = pReader->type;
H
Hongze Cheng 已提交
144 145 146 147 148 149 150 151
        pHdr->size = size;

        TABLEID* pId = (TABLEID*)(&pHdr[1]);
        pId->suid = pReader->pBlockIdx->suid;
        pId->uid = pReader->pBlockIdx->uid;

        tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData);

S
Shengliang Guan 已提交
152
        tsdbInfo("vgId:%d, vnode snapshot read data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64
H
Hongze Cheng 已提交
153
                 " iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d size:%d",
C
Cary Xu 已提交
154
                 TD_VID(pTsdb->pVnode), pTsdb->path, pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid,
H
Hongze Cheng 已提交
155 156
                 pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow,
                 size);
H
Hongze Cheng 已提交
157 158

        goto _exit;
H
Hongze Cheng 已提交
159
      }
H
Hongze Cheng 已提交
160 161 162 163
    }
  }

_exit:
H
Hongze Cheng 已提交
164 165 166
  return code;

_err:
S
Shengliang Guan 已提交
167
  tsdbError("vgId:%d, vnode snapshot tsdb read data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
168
            tstrerror(code));
H
Hongze Cheng 已提交
169
#endif
H
Hongze Cheng 已提交
170 171 172 173 174 175
  return code;
}

static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t   code = 0;
  STsdb*    pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
176
  SDelFile* pDelFile = pReader->fs.pDelFile;
H
Hongze Cheng 已提交
177 178 179 180 181 182 183

  if (pReader->pDelFReader == NULL) {
    if (pDelFile == NULL) {
      goto _exit;
    }

    // open
H
Hongze Cheng 已提交
184
    code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pTsdb);
H
Hongze Cheng 已提交
185 186
    if (code) goto _err;

H
Hongze Cheng 已提交
187
    // read index
H
Hongze Cheng 已提交
188
    code = tsdbReadDelIdx(pReader->pDelFReader, pReader->aDelIdx);
H
Hongze Cheng 已提交
189 190 191 192 193
    if (code) goto _err;

    pReader->iDelIdx = 0;
  }

H
Hongze Cheng 已提交
194 195 196 197 198 199
  while (true) {
    if (pReader->iDelIdx >= taosArrayGetSize(pReader->aDelIdx)) {
      tsdbDelFReaderClose(&pReader->pDelFReader);
      break;
    }

H
Hongze Cheng 已提交
200 201
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pReader->aDelIdx, pReader->iDelIdx);

H
Hongze Cheng 已提交
202 203
    pReader->iDelIdx++;

H
Hongze Cheng 已提交
204
    code = tsdbReadDelData(pReader->pDelFReader, pDelIdx, pReader->aDelData);
H
Hongze Cheng 已提交
205 206
    if (code) goto _err;

H
Hongze Cheng 已提交
207
    int32_t size = 0;
H
Hongze Cheng 已提交
208 209 210 211
    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);

      if (pDelData->version >= pReader->sver && pDelData->version <= pReader->ever) {
H
Hongze Cheng 已提交
212
        size += tPutDelData(NULL, pDelData);
H
Hongze Cheng 已提交
213 214 215
      }
    }

H
Hongze Cheng 已提交
216
    if (size == 0) continue;
H
Hongze Cheng 已提交
217

H
Hongze Cheng 已提交
218 219 220 221 222 223 224
    // org data
    size = sizeof(TABLEID) + size;
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
    if (*ppData == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
C
Cary Xu 已提交
227
    pHdr->type = SNAP_DATA_DEL;
H
Hongze Cheng 已提交
228
    pHdr->size = size;
H
Hongze Cheng 已提交
229

H
Hongze Cheng 已提交
230 231 232 233 234 235
    TABLEID* pId = (TABLEID*)(&pHdr[1]);
    pId->suid = pDelIdx->suid;
    pId->uid = pDelIdx->uid;
    int32_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
    for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); iDelData++) {
      SDelData* pDelData = (SDelData*)taosArrayGet(pReader->aDelData, iDelData);
H
Hongze Cheng 已提交
236

H
Hongze Cheng 已提交
237 238
      if (pDelData->version < pReader->sver) continue;
      if (pDelData->version > pReader->ever) continue;
H
Hongze Cheng 已提交
239

H
Hongze Cheng 已提交
240
      n += tPutDelData((*ppData) + n, pDelData);
H
Hongze Cheng 已提交
241 242
    }

S
Shengliang Guan 已提交
243
    tsdbInfo("vgId:%d, vnode snapshot tsdb read del data for %s, suid:%" PRId64 " uid:%d" PRId64 " size:%d",
C
Cary Xu 已提交
244
             TD_VID(pTsdb->pVnode), pTsdb->path, pDelIdx->suid, pDelIdx->uid, size);
H
Hongze Cheng 已提交
245 246 247

    break;
  }
H
Hongze Cheng 已提交
248 249 250 251 252

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
253
  tsdbError("vgId:%d, vnode snapshot tsdb read del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->pVnode,
C
Cary Xu 已提交
254
            tstrerror(code));
H
Hongze Cheng 已提交
255 256
  return code;
}
H
more  
Hongze Cheng 已提交
257

C
Cary Xu 已提交
258
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
259
  int32_t          code = 0;
H
Hongze Cheng 已提交
260
  STsdbSnapReader* pReader = NULL;
H
Hongze Cheng 已提交
261

H
more  
Hongze Cheng 已提交
262
  // alloc
H
Hongze Cheng 已提交
263
  pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
more  
Hongze Cheng 已提交
264 265 266 267 268 269 270
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pReader->pTsdb = pTsdb;
  pReader->sver = sver;
  pReader->ever = ever;
C
Cary Xu 已提交
271
  pReader->type = type;
H
more  
Hongze Cheng 已提交
272

H
Hongze Cheng 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
  code = taosThreadRwlockRdlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

  code = tsdbFSRef(pTsdb, &pReader->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _err;
  }

  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

H
Hongze Cheng 已提交
291
  pReader->fid = INT32_MIN;
H
Hongze Cheng 已提交
292 293 294 295 296 297
  pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pReader->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pReader->mBlock = tMapDataInit();
H
Hongze Cheng 已提交
298
  code = tBlockDataCreate(&pReader->oBlockData);
H
Hongze Cheng 已提交
299
  if (code) goto _err;
H
Hongze Cheng 已提交
300
  code = tBlockDataCreate(&pReader->nBlockData);
H
Hongze Cheng 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313
  if (code) goto _err;

  pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pReader->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pReader->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pReader->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

S
Shengliang Guan 已提交
314
  tsdbInfo("vgId:%d, vnode snapshot tsdb reader opened for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
H
more  
Hongze Cheng 已提交
315 316 317 318
  *ppReader = pReader;
  return code;

_err:
S
Shengliang Guan 已提交
319
  tsdbError("vgId:%d, vnode snapshot tsdb reader open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
320
            tstrerror(code));
H
more  
Hongze Cheng 已提交
321 322
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
323 324
}

H
Hongze Cheng 已提交
325
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
326
  int32_t          code = 0;
H
Hongze Cheng 已提交
327 328 329 330 331
  STsdbSnapReader* pReader = *ppReader;

  if (pReader->pDataFReader) {
    tsdbDataFReaderClose(&pReader->pDataFReader);
  }
H
Hongze Cheng 已提交
332 333
  taosArrayDestroy(pReader->aBlockIdx);
  tMapDataClear(&pReader->mBlock);
H
Hongze Cheng 已提交
334 335
  tBlockDataDestroy(&pReader->oBlockData, 1);
  tBlockDataDestroy(&pReader->nBlockData, 1);
H
Hongze Cheng 已提交
336 337 338 339 340 341 342

  if (pReader->pDelFReader) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
  }
  taosArrayDestroy(pReader->aDelIdx);
  taosArrayDestroy(pReader->aDelData);

H
Hongze Cheng 已提交
343 344
  tsdbFSUnref(pReader->pTsdb, &pReader->fs);

S
Shengliang Guan 已提交
345
  tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
H
Hongze Cheng 已提交
346

H
Hongze Cheng 已提交
347 348
  taosMemoryFree(pReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
349 350 351 352
  return code;
}

int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
H
more  
Hongze Cheng 已提交
353
  int32_t code = 0;
H
Hongze Cheng 已提交
354

H
Hongze Cheng 已提交
355 356
  *ppData = NULL;

H
Hongze Cheng 已提交
357
  // read data file
H
Hongze Cheng 已提交
358 359 360
  if (!pReader->dataDone) {
    code = tsdbSnapReadData(pReader, ppData);
    if (code) {
H
Hongze Cheng 已提交
361 362 363 364
      goto _err;
    } else {
      if (*ppData) {
        goto _exit;
H
Hongze Cheng 已提交
365
      } else {
H
Hongze Cheng 已提交
366
        pReader->dataDone = 1;
H
Hongze Cheng 已提交
367 368 369
      }
    }
  }
H
Hongze Cheng 已提交
370 371

  // read del file
H
Hongze Cheng 已提交
372 373 374
  if (!pReader->delDone) {
    code = tsdbSnapReadDel(pReader, ppData);
    if (code) {
H
Hongze Cheng 已提交
375 376 377 378
      goto _err;
    } else {
      if (*ppData) {
        goto _exit;
H
Hongze Cheng 已提交
379
      } else {
H
Hongze Cheng 已提交
380
        pReader->delDone = 1;
H
Hongze Cheng 已提交
381 382 383
      }
    }
  }
H
Hongze Cheng 已提交
384

H
Hongze Cheng 已提交
385
_exit:
S
Shengliang Guan 已提交
386
  tsdbDebug("vgId:%d, vnode snapshot tsdb read for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path);
H
more  
Hongze Cheng 已提交
387 388 389
  return code;

_err:
S
Shengliang Guan 已提交
390
  tsdbError("vgId:%d, vnode snapshot tsdb read for %s failed since %s", TD_VID(pReader->pTsdb->pVnode),
C
Cary Xu 已提交
391
            pReader->pTsdb->path, tstrerror(code));
H
more  
Hongze Cheng 已提交
392 393 394
  return code;
}

H
Hongze Cheng 已提交
395
// STsdbSnapWriter ========================================
H
Hongze Cheng 已提交
396 397 398 399
struct STsdbSnapWriter {
  STsdb*  pTsdb;
  int64_t sver;
  int64_t ever;
H
Hongze Cheng 已提交
400
  STsdbFS fs;
H
Hongze Cheng 已提交
401

H
Hongze Cheng 已提交
402 403 404
  // config
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
405 406 407
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
408
  int64_t commitID;
H
Hongze Cheng 已提交
409

H
Hongze Cheng 已提交
410
  // for data file
H
Hongze Cheng 已提交
411 412
  SBlockData bData;

H
Hongze Cheng 已提交
413
  int32_t       fid;
H
Hongze Cheng 已提交
414
  SDataFReader* pDataFReader;
H
Hongze Cheng 已提交
415
  SArray*       aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
416
  int32_t       iBlockIdx;
H
Hongze Cheng 已提交
417
  SBlockIdx*    pBlockIdx;
H
Hongze Cheng 已提交
418
  SMapData      mBlock;  // SMapData<SBlock>
H
Hongze Cheng 已提交
419
  int32_t       iBlock;
H
Hongze Cheng 已提交
420
  SBlockData*   pBlockData;
H
Hongze Cheng 已提交
421
  int32_t       iRow;
H
Hongze Cheng 已提交
422
  SBlockData    bDataR;
H
Hongze Cheng 已提交
423

H
Hongze Cheng 已提交
424
  SDataFWriter* pDataFWriter;
H
Hongze Cheng 已提交
425
  SBlockIdx*    pBlockIdxW;  // NULL when no committing table
H
Hongze Cheng 已提交
426 427
  SBlock        blockW;
  SBlockData    bDataW;
H
Hongze Cheng 已提交
428
  SBlockIdx     blockIdxW;
H
Hongze Cheng 已提交
429

H
Hongze Cheng 已提交
430 431 432
  SMapData mBlockW;     // SMapData<SBlock>
  SArray*  aBlockIdxW;  // SArray<SBlockIdx>

H
Hongze Cheng 已提交
433
  // for del file
H
Hongze Cheng 已提交
434
  SDelFReader* pDelFReader;
H
Hongze Cheng 已提交
435
  SDelFWriter* pDelFWriter;
H
Hongze Cheng 已提交
436
  int32_t      iDelIdx;
H
Hongze Cheng 已提交
437
  SArray*      aDelIdxR;
H
Hongze Cheng 已提交
438
  SArray*      aDelData;
H
Hongze Cheng 已提交
439
  SArray*      aDelIdxW;
H
Hongze Cheng 已提交
440 441
};

H
more  
Hongze Cheng 已提交
442
#if 0
H
Hongze Cheng 已提交
443 444 445 446 447 448 449
static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
  int32_t     code = 0;
  int32_t     iRow = 0;           // todo
  int32_t     nRow = 0;           // todo
  SBlockData* pBlockData = NULL;  // todo

  while (iRow < nRow) {
H
Hongze Cheng 已提交
450
    code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL);
H
Hongze Cheng 已提交
451 452 453 454 455 456
    if (code) goto _err;
  }

  return code;

_err:
S
Shengliang Guan 已提交
457
  tsdbError("vgId:%d, tsdb snapshot write append data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
458
            pWriter->pTsdb->path, tstrerror(code));
H
Hongze Cheng 已提交
459 460 461
  return code;
}

H
Hongze Cheng 已提交
462
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
H
more  
Hongze Cheng 已提交
463 464
  int32_t code = 0;

H
Hongze Cheng 已提交
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
  ASSERT(pWriter->pDataFWriter);

  if (pWriter->pBlockIdxW == NULL) goto _exit;

  // consume remain rows
  if (pWriter->pBlockData) {
    ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
    while (pWriter->iRow < pWriter->pBlockData->nRow) {
      code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
      if (code) goto _err;

      if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) {
        pWriter->blockW.last = 0;
        code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
                                  &pWriter->blockW, pWriter->cmprAlg);
        if (code) goto _err;

        code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
        if (code) goto _err;

        tBlockReset(&pWriter->blockW);
H
Hongze Cheng 已提交
486
        tBlockDataClear(&pWriter->bDataW);
H
Hongze Cheng 已提交
487 488 489 490 491
      }

      pWriter->iRow++;
    }
  }
H
Hongze Cheng 已提交
492

H
Hongze Cheng 已提交
493 494
  // write remain data if has
  if (pWriter->bDataW.nRow > 0) {
H
Hongze Cheng 已提交
495 496 497 498 499
    pWriter->blockW.last = 0;
    if (pWriter->bDataW.nRow < pWriter->minRow) {
      if (pWriter->iBlock > pWriter->mBlock.nItem) {
        pWriter->blockW.last = 1;
      }
H
Hongze Cheng 已提交
500 501
    }

H
Hongze Cheng 已提交
502 503 504
    code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
                              &pWriter->blockW, pWriter->cmprAlg);
    if (code) goto _err;
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506 507 508
    code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
509

H
Hongze Cheng 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
  while (true) {
    if (pWriter->iBlock >= pWriter->mBlock.nItem) break;

    SBlock block;
    tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);

    if (block.last) {
      code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
      if (code) goto _err;

      tBlockReset(&block);
      block.last = 1;
      code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block,
                                pWriter->cmprAlg);
      if (code) goto _err;
    }

    code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
    if (code) goto _err;

    pWriter->iBlock++;
  }

  // SBlock
H
Hongze Cheng 已提交
534 535
  code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
  if (code) goto _err;
H
Hongze Cheng 已提交
536

H
Hongze Cheng 已提交
537
  // SBlockIdx
H
Hongze Cheng 已提交
538 539 540
  if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
541 542
  }

H
Hongze Cheng 已提交
543
_exit:
S
Shengliang Guan 已提交
544
  tsdbInfo("vgId:%d, tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
H
Hongze Cheng 已提交
545 546 547
  return code;

_err:
S
Shengliang Guan 已提交
548
  tsdbError("vgId:%d, tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
549
            pWriter->pTsdb->path, tstrerror(code));
H
Hongze Cheng 已提交
550 551
  return code;
}
H
Hongze Cheng 已提交
552

H
Hongze Cheng 已提交
553
static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) {
H
Hongze Cheng 已提交
554 555
  int32_t code = 0;

H
Hongze Cheng 已提交
556
  code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock);
H
Hongze Cheng 已提交
557
  if (code) goto _err;
H
Hongze Cheng 已提交
558

H
Hongze Cheng 已提交
559 560 561 562 563
  // SBlockData
  SBlock block;
  tMapDataReset(&pWriter->mBlockW);
  for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
    tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
H
Hongze Cheng 已提交
564

H
Hongze Cheng 已提交
565 566 567
    if (block.last) {
      code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
      if (code) goto _err;
H
Hongze Cheng 已提交
568

H
Hongze Cheng 已提交
569 570 571 572
      tBlockReset(&block);
      block.last = 1;
      code =
          tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, pWriter->cmprAlg);
H
Hongze Cheng 已提交
573 574 575
      if (code) goto _err;
    }

H
Hongze Cheng 已提交
576
    code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
H
Hongze Cheng 已提交
577
    if (code) goto _err;
H
Hongze Cheng 已提交
578
  }
H
Hongze Cheng 已提交
579

H
Hongze Cheng 已提交
580 581 582 583 584 585 586 587 588
  // SBlock
  SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
  code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx);
  if (code) goto _err;

  // SBlockIdx
  if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
H
Hongze Cheng 已提交
589 590 591 592 593 594
  }

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
595
  tsdbError("vgId:%d, tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
596
            pWriter->pTsdb->path, tstrerror(code));
H
Hongze Cheng 已提交
597 598 599
  return code;
}

H
Hongze Cheng 已提交
600
static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
601 602
  int32_t     code = 0;
  SBlockData* pBlockData = &pWriter->bData;
H
Hongze Cheng 已提交
603 604 605
  int32_t     iRow = 0;
  TSDBROW     row;
  TSDBROW*    pRow = &row;
H
Hongze Cheng 已提交
606

H
Hongze Cheng 已提交
607 608 609
  // correct schema
  code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData);
  if (code) goto _err;
H
Hongze Cheng 已提交
610

H
Hongze Cheng 已提交
611 612
  // loop to merge
  *pRow = tsdbRowFromBlockData(pBlockData, iRow);
H
Hongze Cheng 已提交
613 614
  while (true) {
    if (pRow == NULL) break;
H
Hongze Cheng 已提交
615

H
Hongze Cheng 已提交
616 617
    if (pWriter->pBlockData) {
      ASSERT(pWriter->iRow < pWriter->pBlockData->nRow);
H
Hongze Cheng 已提交
618

H
Hongze Cheng 已提交
619
      int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow));
H
Hongze Cheng 已提交
620

H
Hongze Cheng 已提交
621
      ASSERT(c);
H
Hongze Cheng 已提交
622

H
Hongze Cheng 已提交
623 624
      if (c < 0) {
        code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
H
Hongze Cheng 已提交
625 626
        if (code) goto _err;

H
Hongze Cheng 已提交
627 628
        iRow++;
        if (iRow < pWriter->pBlockData->nRow) {
H
Hongze Cheng 已提交
629
          *pRow = tsdbRowFromBlockData(pBlockData, iRow);
H
Hongze Cheng 已提交
630 631 632 633 634
        } else {
          pRow = NULL;
        }
      } else if (c > 0) {
        code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL);
H
Hongze Cheng 已提交
635 636
        if (code) goto _err;

H
Hongze Cheng 已提交
637 638 639 640
        pWriter->iRow++;
        if (pWriter->iRow >= pWriter->pBlockData->nRow) {
          pWriter->pBlockData = NULL;
        }
H
Hongze Cheng 已提交
641
      }
H
Hongze Cheng 已提交
642
    } else {
H
Hongze Cheng 已提交
643
      TSDBKEY key = TSDBROW_KEY(pRow);
H
Hongze Cheng 已提交
644

H
Hongze Cheng 已提交
645 646 647 648
      while (true) {
        if (pWriter->iBlock >= pWriter->mBlock.nItem) break;

        SBlock  block;
H
Hongze Cheng 已提交
649
        int32_t c;
H
Hongze Cheng 已提交
650

H
Hongze Cheng 已提交
651
        tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
H
Hongze Cheng 已提交
652

H
Hongze Cheng 已提交
653 654
        if (block.last) {
          pWriter->pBlockData = &pWriter->bDataR;
H
Hongze Cheng 已提交
655

H
Hongze Cheng 已提交
656
          code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
H
Hongze Cheng 已提交
657
          if (code) goto _err;
H
Hongze Cheng 已提交
658
          pWriter->iRow = 0;
H
Hongze Cheng 已提交
659

H
Hongze Cheng 已提交
660
          pWriter->iBlock++;
H
Hongze Cheng 已提交
661
          break;
H
Hongze Cheng 已提交
662 663
        }

H
Hongze Cheng 已提交
664
        c = tsdbKeyCmprFn(&block.maxKey, &key);
H
Hongze Cheng 已提交
665 666 667

        ASSERT(c);

H
Hongze Cheng 已提交
668 669 670 671 672 673
        if (c < 0) {
          if (pWriter->bDataW.nRow) {
            pWriter->blockW.last = 0;
            code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
                                      &pWriter->blockW, pWriter->cmprAlg);
            if (code) goto _err;
H
Hongze Cheng 已提交
674

H
Hongze Cheng 已提交
675 676 677 678
            code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
            if (code) goto _err;

            tBlockReset(&pWriter->blockW);
H
Hongze Cheng 已提交
679
            tBlockDataClear(&pWriter->bDataW);
H
Hongze Cheng 已提交
680
          }
H
Hongze Cheng 已提交
681

H
Hongze Cheng 已提交
682
          code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
H
Hongze Cheng 已提交
683
          if (code) goto _err;
H
Hongze Cheng 已提交
684

H
Hongze Cheng 已提交
685 686 687
          pWriter->iBlock++;
        } else {
          c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey);
H
Hongze Cheng 已提交
688

H
Hongze Cheng 已提交
689 690
          ASSERT(c);

H
Hongze Cheng 已提交
691 692 693 694 695 696
          if (c > 0) {
            pWriter->pBlockData = &pWriter->bDataR;
            code =
                tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL);
            if (code) goto _err;
            pWriter->iRow = 0;
H
Hongze Cheng 已提交
697

H
Hongze Cheng 已提交
698 699
            pWriter->iBlock++;
          }
H
Hongze Cheng 已提交
700 701
          break;
        }
H
Hongze Cheng 已提交
702 703
      }

H
Hongze Cheng 已提交
704 705
      if (pWriter->pBlockData) continue;

H
Hongze Cheng 已提交
706 707
      code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL);
      if (code) goto _err;
H
Hongze Cheng 已提交
708

H
Hongze Cheng 已提交
709
      iRow++;
H
Hongze Cheng 已提交
710 711
      if (iRow < pBlockData->nRow) {
        *pRow = tsdbRowFromBlockData(pBlockData, iRow);
H
Hongze Cheng 已提交
712 713 714
      } else {
        pRow = NULL;
      }
H
Hongze Cheng 已提交
715 716
    }

H
Hongze Cheng 已提交
717 718
  _check_write:
    if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
H
Hongze Cheng 已提交
719

H
Hongze Cheng 已提交
720
  _write_block:
H
Hongze Cheng 已提交
721 722
    code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
                              &pWriter->blockW, pWriter->cmprAlg);
H
Hongze Cheng 已提交
723
    if (code) goto _err;
H
Hongze Cheng 已提交
724

H
Hongze Cheng 已提交
725 726
    code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
    if (code) goto _err;
H
Hongze Cheng 已提交
727

H
Hongze Cheng 已提交
728
    tBlockReset(&pWriter->blockW);
H
Hongze Cheng 已提交
729
    tBlockDataClear(&pWriter->bDataW);
H
Hongze Cheng 已提交
730 731
  }

H
Hongze Cheng 已提交
732 733 734
  return code;

_err:
S
Shengliang Guan 已提交
735
  tsdbError("vgId:%d, vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
736
            pWriter->pTsdb->path, tstrerror(code));
H
Hongze Cheng 已提交
737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778
  return code;
}

static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
  int32_t     code = 0;
  SBlockData* pBlockData = &pWriter->bData;
  TSDBKEY     keyFirst = tBlockDataFirstKey(pBlockData);
  TSDBKEY     keyLast = tBlockDataLastKey(pBlockData);

  // end last table write if should
  if (pWriter->pBlockIdxW) {
    int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id);
    if (c < 0) {
      // end
      code = tsdbSnapWriteTableDataEnd(pWriter);
      if (code) goto _err;

      // reset
      pWriter->pBlockIdxW = NULL;
    } else if (c > 0) {
      ASSERT(0);
    }
  }

  // start new table data write if need
  if (pWriter->pBlockIdxW == NULL) {
    // write table data ahead
    while (true) {
      if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break;

      SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
      int32_t    c = tTABLEIDCmprFn(pBlockIdx, &id);

      if (c >= 0) break;

      code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx);
      if (code) goto _err;

      pWriter->iBlockIdx++;
    }

    // reader
H
Hongze Cheng 已提交
779
    pWriter->pBlockIdx = NULL;
H
Hongze Cheng 已提交
780 781 782
    if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
      ASSERT(pWriter->pDataFReader);

H
Hongze Cheng 已提交
783
      SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
H
Hongze Cheng 已提交
784 785 786 787 788 789 790 791 792 793 794
      int32_t    c = tTABLEIDCmprFn(pBlockIdx, &id);

      ASSERT(c >= 0);

      if (c == 0) {
        pWriter->pBlockIdx = pBlockIdx;
        pWriter->iBlockIdx++;
      }
    }

    if (pWriter->pBlockIdx) {
H
Hongze Cheng 已提交
795
      code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock);
H
Hongze Cheng 已提交
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
      if (code) goto _err;
    } else {
      tMapDataReset(&pWriter->mBlock);
    }
    pWriter->iBlock = 0;
    pWriter->pBlockData = NULL;
    pWriter->iRow = 0;

    // writer
    pWriter->pBlockIdxW = &pWriter->blockIdxW;
    pWriter->pBlockIdxW->suid = id.suid;
    pWriter->pBlockIdxW->uid = id.uid;

    tBlockReset(&pWriter->blockW);
    tBlockDataReset(&pWriter->bDataW);
    tMapDataReset(&pWriter->mBlockW);
  }

  ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid);
  ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid));

  code = tsdbSnapWriteTableDataImpl(pWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
820
_exit:
H
more  
Hongze Cheng 已提交
821 822
  tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode),
            pWriter->pTsdb->path);
H
Hongze Cheng 已提交
823
  return code;
H
Hongze Cheng 已提交
824 825

_err:
S
Shengliang Guan 已提交
826
  tsdbError("vgId:%d, vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
827
            pWriter->pTsdb->path, tstrerror(code));
H
Hongze Cheng 已提交
828
  return code;
H
Hongze Cheng 已提交
829
}
H
Hongze Cheng 已提交
830

H
Hongze Cheng 已提交
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
  STsdb*  pTsdb = pWriter->pTsdb;

  if (pWriter->pDataFWriter == NULL) goto _exit;

  code = tsdbSnapWriteTableDataEnd(pWriter);
  if (code) goto _err;

  while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
    code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx));
    if (code) goto _err;

    pWriter->iBlockIdx++;
  }

H
Hongze Cheng 已提交
847
  code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW);
H
Hongze Cheng 已提交
848 849
  if (code) goto _err;

H
Hongze Cheng 已提交
850
  code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
H
Hongze Cheng 已提交
851 852 853 854 855 856 857 858 859 860 861
  if (code) goto _err;

  code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
  if (code) goto _err;

  if (pWriter->pDataFReader) {
    code = tsdbDataFReaderClose(&pWriter->pDataFReader);
    if (code) goto _err;
  }

_exit:
S
Shengliang Guan 已提交
862
  tsdbInfo("vgId:%d, vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path);
H
Hongze Cheng 已提交
863 864 865
  return code;

_err:
S
Shengliang Guan 已提交
866
  tsdbError("vgId:%d, vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
867
            tstrerror(code));
H
Hongze Cheng 已提交
868 869 870
  return code;
}

H
Hongze Cheng 已提交
871
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
Hongze Cheng 已提交
872 873 874 875
  int32_t code = 0;
  STsdb*  pTsdb = pWriter->pTsdb;
  TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
  int64_t n;
H
Hongze Cheng 已提交
876 877

  // decode
H
Hongze Cheng 已提交
878
  SBlockData* pBlockData = &pWriter->bData;
H
Hongze Cheng 已提交
879 880
  n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData);
  ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData);
H
Hongze Cheng 已提交
881

H
Hongze Cheng 已提交
882 883 884
  // open file
  TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData);
  TSDBKEY keyLast = tBlockDataLastKey(pBlockData);
H
Hongze Cheng 已提交
885

H
Hongze Cheng 已提交
886 887
  int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision);
  ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision));
H
Hongze Cheng 已提交
888
  if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) {
H
Hongze Cheng 已提交
889
    // end last file data write if need
H
Hongze Cheng 已提交
890
    code = tsdbSnapWriteDataEnd(pWriter);
H
Hongze Cheng 已提交
891
    if (code) goto _err;
H
Hongze Cheng 已提交
892 893

    pWriter->fid = fid;
H
Hongze Cheng 已提交
894 895

    // read
H
Hongze Cheng 已提交
896
    SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
897 898 899 900
    if (pSet) {
      code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
      if (code) goto _err;

H
Hongze Cheng 已提交
901
      code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx);
H
Hongze Cheng 已提交
902 903
      if (code) goto _err;
    } else {
H
Hongze Cheng 已提交
904
      ASSERT(pWriter->pDataFReader == NULL);
H
Hongze Cheng 已提交
905 906 907
      taosArrayClear(pWriter->aBlockIdx);
    }
    pWriter->iBlockIdx = 0;
H
Hongze Cheng 已提交
908 909 910
    pWriter->pBlockIdx = NULL;
    tMapDataReset(&pWriter->mBlock);
    pWriter->iBlock = 0;
H
Hongze Cheng 已提交
911
    pWriter->pBlockData = NULL;
H
Hongze Cheng 已提交
912
    pWriter->iRow = 0;
H
Hongze Cheng 已提交
913
    tBlockDataReset(&pWriter->bDataR);
H
Hongze Cheng 已提交
914

H
Hongze Cheng 已提交
915
    // write
H
Hongze Cheng 已提交
916 917 918 919 920
    SHeadFile fHead;
    SDataFile fData;
    SLastFile fLast;
    SSmaFile  fSma;
    SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
H
Hongze Cheng 已提交
921 922

    if (pSet) {
H
Hongze Cheng 已提交
923 924
      wSet.diskId = pSet->diskId;
      wSet.fid = fid;
H
Hongze Cheng 已提交
925
      fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .loffset = 0};
H
Hongze Cheng 已提交
926 927 928
      fData = *pSet->pDataF;
      fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
      fSma = *pSet->pSmaF;
H
Hongze Cheng 已提交
929
    } else {
H
Hongze Cheng 已提交
930 931
      wSet.diskId = (SDiskID){.level = 0, .id = 0};
      wSet.fid = fid;
H
Hongze Cheng 已提交
932
      fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .loffset = 0};
H
Hongze Cheng 已提交
933 934 935
      fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
      fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
      fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
H
Hongze Cheng 已提交
936 937 938 939
    }

    code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
    if (code) goto _err;
H
Hongze Cheng 已提交
940

H
Hongze Cheng 已提交
941 942
    taosArrayClear(pWriter->aBlockIdxW);
    tMapDataReset(&pWriter->mBlockW);
H
Hongze Cheng 已提交
943
    pWriter->pBlockIdxW = NULL;
H
Hongze Cheng 已提交
944
    tBlockDataReset(&pWriter->bDataW);
H
Hongze Cheng 已提交
945 946
  }

H
Hongze Cheng 已提交
947
  code = tsdbSnapWriteTableData(pWriter, id);
H
Hongze Cheng 已提交
948
  if (code) goto _err;
H
Hongze Cheng 已提交
949

S
Shengliang Guan 已提交
950
  tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",
C
Cary Xu 已提交
951
           TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow);
H
Hongze Cheng 已提交
952 953 954
  return code;

_err:
S
Shengliang Guan 已提交
955
  tsdbError("vgId:%d, vnode snapshot tsdb write data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
956
            tstrerror(code));
H
Hongze Cheng 已提交
957 958 959 960 961
  return code;
}

static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
962 963 964
  STsdb*  pTsdb = pWriter->pTsdb;

  if (pWriter->pDelFWriter == NULL) {
H
Hongze Cheng 已提交
965
    SDelFile* pDelFile = pWriter->fs.pDelFile;
H
Hongze Cheng 已提交
966 967 968 969 970 971

    // reader
    if (pDelFile) {
      code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
      if (code) goto _err;

H
Hongze Cheng 已提交
972
      code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR);
H
Hongze Cheng 已提交
973 974 975 976
      if (code) goto _err;
    }

    // writer
H
Hongze Cheng 已提交
977
    SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0};
H
Hongze Cheng 已提交
978 979 980 981 982
    code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
    if (code) goto _err;
  }

  // process the del data
H
Hongze Cheng 已提交
983
  TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr));
H
Hongze Cheng 已提交
984 985 986

  while (true) {
    SDelIdx* pDelIdx = NULL;
H
Hongze Cheng 已提交
987
    int64_t  n = sizeof(SSnapDataHdr) + sizeof(TABLEID);
H
Hongze Cheng 已提交
988 989 990 991
    SDelData delData;
    SDelIdx  delIdx;
    int8_t   toBreak = 0;

H
Hongze Cheng 已提交
992
    if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) {
H
Hongze Cheng 已提交
993
      pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
H
Hongze Cheng 已提交
994 995 996 997 998 999 1000
    }

    if (pDelIdx) {
      int32_t c = tTABLEIDCmprFn(&id, pDelIdx);
      if (c < 0) {
        goto _new_del;
      } else {
H
Hongze Cheng 已提交
1001
        code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
H
Hongze Cheng 已提交
1002 1003 1004 1005 1006
        if (code) goto _err;

        pWriter->iDelIdx++;
        if (c == 0) {
          toBreak = 1;
H
Hongze Cheng 已提交
1007
          delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
H
Hongze Cheng 已提交
1008 1009
          goto _merge_del;
        } else {
H
Hongze Cheng 已提交
1010
          delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
H
Hongze Cheng 已提交
1011 1012 1013 1014 1015 1016 1017
          goto _write_del;
        }
      }
    }

  _new_del:
    toBreak = 1;
H
Hongze Cheng 已提交
1018
    delIdx = (SDelIdx){.suid = id.suid, .uid = id.uid};
H
Hongze Cheng 已提交
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
    taosArrayClear(pWriter->aDelData);

  _merge_del:
    while (n < nData) {
      n += tGetDelData(pData + n, &delData);
      if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }
    }

  _write_del:
H
Hongze Cheng 已提交
1031
    code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
H
Hongze Cheng 已提交
1032 1033
    if (code) goto _err;

H
Hongze Cheng 已提交
1034
    if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) {
H
Hongze Cheng 已提交
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }

    if (toBreak) break;
  }

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
1046
  tsdbError("vgId:%d, vnode snapshot tsdb write del for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
1047
            tstrerror(code));
H
Hongze Cheng 已提交
1048 1049
  return code;
}
H
more  
Hongze Cheng 已提交
1050
#endif
H
Hongze Cheng 已提交
1051

H
more  
Hongze Cheng 已提交
1052
#if 0
H
Hongze Cheng 已提交
1053 1054
static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1055 1056 1057
  STsdb*  pTsdb = pWriter->pTsdb;

  if (pWriter->pDelFWriter == NULL) goto _exit;
H
Hongze Cheng 已提交
1058

H
Hongze Cheng 已提交
1059 1060
  for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) {
    SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx);
H
Hongze Cheng 已提交
1061

H
Hongze Cheng 已提交
1062
    code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
H
Hongze Cheng 已提交
1063 1064 1065
    if (code) goto _err;

    SDelIdx delIdx = (SDelIdx){.suid = pDelIdx->suid, .uid = pDelIdx->uid};
H
Hongze Cheng 已提交
1066
    code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx);
H
Hongze Cheng 已提交
1067 1068
    if (code) goto _err;

H
Hongze Cheng 已提交
1069
    if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) {
H
Hongze Cheng 已提交
1070 1071 1072 1073 1074 1075 1076 1077
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
  }

  code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
1078
  code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
  if (code) goto _err;

  code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
  if (code) goto _err;

  if (pWriter->pDelFReader) {
    code = tsdbDelFReaderClose(&pWriter->pDelFReader);
    if (code) goto _err;
  }

_exit:
S
Shengliang Guan 已提交
1090
  tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path);
H
Hongze Cheng 已提交
1091 1092 1093
  return code;

_err:
S
Shengliang Guan 已提交
1094
  tsdbError("vgId:%d, vnode snapshot tsdb write del end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
1095
            tstrerror(code));
H
Hongze Cheng 已提交
1096 1097
  return code;
}
H
more  
Hongze Cheng 已提交
1098
#endif
H
Hongze Cheng 已提交
1099

H
Hongze Cheng 已提交
1100
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
H
more  
Hongze Cheng 已提交
1101 1102
  int32_t code = 0;
#if 0
H
Hongze Cheng 已提交
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
  STsdbSnapWriter* pWriter = NULL;

  // alloc
  pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->pTsdb = pTsdb;
  pWriter->sver = sver;
  pWriter->ever = ever;

H
Hongze Cheng 已提交
1115 1116 1117
  code = tsdbFSCopy(pTsdb, &pWriter->fs);
  if (code) goto _err;

H
Hongze Cheng 已提交
1118
  // config
H
Hongze Cheng 已提交
1119 1120 1121 1122 1123
  pWriter->minutes = pTsdb->keepCfg.days;
  pWriter->precision = pTsdb->keepCfg.precision;
  pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
1124 1125 1126
  pWriter->commitID = pTsdb->pVnode->state.commitID;

  // for data file
H
Hongze Cheng 已提交
1127
  code = tBlockDataCreate(&pWriter->bData);
H
Hongze Cheng 已提交
1128 1129 1130 1131 1132 1133 1134

  if (code) goto _err;
  pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pWriter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
1135
  code = tBlockDataCreate(&pWriter->bDataR);
H
Hongze Cheng 已提交
1136 1137 1138 1139 1140 1141 1142
  if (code) goto _err;

  pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx));
  if (pWriter->aBlockIdxW == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
1143
  code = tBlockDataCreate(&pWriter->bDataW);
H
Hongze Cheng 已提交
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161
  if (code) goto _err;

  // for del file
  pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
  if (pWriter->aDelIdxR == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pWriter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx));
  if (pWriter->aDelIdxW == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
1162

H
Hongze Cheng 已提交
1163 1164
  *ppWriter = pWriter;

S
Shengliang Guan 已提交
1165
  tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
C
Cary Xu 已提交
1166
  return code;
H
Hongze Cheng 已提交
1167
_err:
S
Shengliang Guan 已提交
1168
  tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path,
C
Cary Xu 已提交
1169
            tstrerror(code));
H
Hongze Cheng 已提交
1170
  *ppWriter = NULL;
H
more  
Hongze Cheng 已提交
1171
#endif
H
Hongze Cheng 已提交
1172 1173 1174
  return code;
}

H
Hongze Cheng 已提交
1175
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
H
more  
Hongze Cheng 已提交
1176 1177
  int32_t code = 0;
#if 0
H
Hongze Cheng 已提交
1178 1179 1180
  STsdbSnapWriter* pWriter = *ppWriter;

  if (rollback) {
H
Hongze Cheng 已提交
1181 1182 1183
    ASSERT(0);
    // code = tsdbFSRollback(pWriter->pTsdb->pFS);
    // if (code) goto _err;
H
Hongze Cheng 已提交
1184
  } else {
H
Hongze Cheng 已提交
1185 1186 1187 1188 1189 1190
    code = tsdbSnapWriteDataEnd(pWriter);
    if (code) goto _err;

    code = tsdbSnapWriteDelEnd(pWriter);
    if (code) goto _err;

H
Hongze Cheng 已提交
1191 1192 1193 1194
    code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs);
    if (code) goto _err;

    code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs);
H
Hongze Cheng 已提交
1195 1196 1197
    if (code) goto _err;
  }

S
Shengliang Guan 已提交
1198
  tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
H
Hongze Cheng 已提交
1199 1200 1201 1202 1203
  taosMemoryFree(pWriter);
  *ppWriter = NULL;
  return code;

_err:
S
Shengliang Guan 已提交
1204
  tsdbError("vgId:%d, vnode snapshot tsdb writer close for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode),
C
Cary Xu 已提交
1205 1206 1207
            pWriter->pTsdb->path, tstrerror(code));
  taosMemoryFree(pWriter);
  *ppWriter = NULL;
H
more  
Hongze Cheng 已提交
1208
#endif
H
Hongze Cheng 已提交
1209 1210 1211 1212
  return code;
}

int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
H
more  
Hongze Cheng 已提交
1213 1214
  int32_t code = 0;
#if 0
H
Hongze Cheng 已提交
1215
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
H
Hongze Cheng 已提交
1216 1217

  // ts data
C
Cary Xu 已提交
1218
  if (pHdr->type == SNAP_DATA_TSDB) {
H
Hongze Cheng 已提交
1219
    code = tsdbSnapWriteData(pWriter, pData, nData);
H
Hongze Cheng 已提交
1220
    if (code) goto _err;
H
Hongze Cheng 已提交
1221 1222

    goto _exit;
H
Hongze Cheng 已提交
1223
  } else {
H
Hongze Cheng 已提交
1224 1225 1226 1227
    if (pWriter->pDataFWriter) {
      code = tsdbSnapWriteDataEnd(pWriter);
      if (code) goto _err;
    }
H
Hongze Cheng 已提交
1228 1229 1230
  }

  // del data
C
Cary Xu 已提交
1231
  if (pHdr->type == SNAP_DATA_DEL) {
H
Hongze Cheng 已提交
1232
    code = tsdbSnapWriteDel(pWriter, pData, nData);
H
Hongze Cheng 已提交
1233 1234 1235
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
1236
_exit:
C
Cary Xu 已提交
1237
  tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
H
more  
Hongze Cheng 已提交
1238

H
Hongze Cheng 已提交
1239 1240 1241
  return code;

_err:
C
Cary Xu 已提交
1242
  tsdbError("vgId:%d, tsdb snapshot write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path,
C
Cary Xu 已提交
1243
            tstrerror(code));
H
more  
Hongze Cheng 已提交
1244
#endif
H
Hongze Cheng 已提交
1245 1246
  return code;
}