tsdbSnapshot.c 44.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22
// STsdbSnapReader ========================================
H
Hongze Cheng 已提交
23
struct STsdbSnapReader {
H
Hongze Cheng 已提交
24 25 26 27 28
  STsdb*   pTsdb;
  int64_t  sver;
  int64_t  ever;
  int8_t   type;
  uint8_t* aBuf[5];
H
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30
  STsdbFS  fs;
H
Hongze Cheng 已提交
31
  TABLEID  tbid;
H
Hongze Cheng 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  SSkmInfo skmTable;

  // timeseries data
  int8_t  dataDone;
  int32_t fid;

  SDataFReader*   pDataFReader;
  STsdbDataIter2* iterList;
  STsdbDataIter2* pIter;
  SRBTree         rbt;
  SBlockData      bData;

  // tombstone data
  int8_t          delDone;
  SDelFReader*    pDelFReader;
  STsdbDataIter2* pTIter;
H
Hongze Cheng 已提交
48
  SArray*         aDelData;
H
Hongze Cheng 已提交
49
};
H
Hongze Cheng 已提交
50

H
Hongze Cheng 已提交
51
static int32_t tsdbSnapReadFileDataStart(STsdbSnapReader* pReader) {
H
Hongze Cheng 已提交
52
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
53
  int32_t lino = 0;
H
Hongze Cheng 已提交
54

H
Hongze Cheng 已提交
55 56 57 58 59
  SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT);
  if (pSet == NULL) {
    pReader->fid = INT32_MAX;
    goto _exit;
  }
H
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61 62
  pReader->fid = pSet->fid;

H
Hongze Cheng 已提交
63
  tRBTreeCreate(&pReader->rbt, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
64

H
Hongze Cheng 已提交
65
  code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet);
H
add log  
Hongze Cheng 已提交
66
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
67

H
Hongze Cheng 已提交
68 69
  code = tsdbOpenDataFileDataIter(pReader->pDataFReader, &pReader->pIter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
70

H
Hongze Cheng 已提交
71 72
  if (pReader->pIter) {
    // iter to next with filter info (sver, ever)
H
Hongze Cheng 已提交
73 74 75 76 77
    code = tsdbDataIterNext2(
        pReader->pIter,
        &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE,  // flag
                           .sver = pReader->sver,
                           .ever = pReader->ever});
H
add log  
Hongze Cheng 已提交
78
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
79

H
Hongze Cheng 已提交
80 81 82
    if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) {
      // add to rbtree
      tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
H
Hongze Cheng 已提交
83

H
Hongze Cheng 已提交
84 85 86 87 88
      // add to iterList
      pReader->pIter->next = pReader->iterList;
      pReader->iterList = pReader->pIter;
    } else {
      tsdbCloseDataIter2(pReader->pIter);
H
Hongze Cheng 已提交
89 90 91
    }
  }

H
Hongze Cheng 已提交
92 93
  for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
    code = tsdbOpenSttFileDataIter(pReader->pDataFReader, iStt, &pReader->pIter);
H
add log  
Hongze Cheng 已提交
94
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
95 96

    if (pReader->pIter) {
H
Hongze Cheng 已提交
97
      // iter to valid row
H
Hongze Cheng 已提交
98 99 100 101 102
      code = tsdbDataIterNext2(
          pReader->pIter,
          &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE,  // flag
                             .sver = pReader->sver,
                             .ever = pReader->ever});
H
add log  
Hongze Cheng 已提交
103
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
104

H
Hongze Cheng 已提交
105 106 107
      if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) {
        // add to rbtree
        tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
H
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109 110 111 112 113
        // add to iterList
        pReader->pIter->next = pReader->iterList;
        pReader->iterList = pReader->pIter;
      } else {
        tsdbCloseDataIter2(pReader->pIter);
H
Hongze Cheng 已提交
114 115 116 117
      }
    }
  }

H
Hongze Cheng 已提交
118
  pReader->pIter = NULL;
H
Hongze Cheng 已提交
119

H
add log  
Hongze Cheng 已提交
120 121
_exit:
  if (code) {
H
Hongze Cheng 已提交
122
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
add log  
Hongze Cheng 已提交
123
  } else {
H
Hongze Cheng 已提交
124
    tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->fid);
H
add log  
Hongze Cheng 已提交
125
  }
H
Hongze Cheng 已提交
126 127 128
  return code;
}

H
Hongze Cheng 已提交
129 130 131 132 133 134
static void tsdbSnapReadFileDataEnd(STsdbSnapReader* pReader) {
  while (pReader->iterList) {
    STsdbDataIter2* pIter = pReader->iterList;
    pReader->iterList = pIter->next;
    tsdbCloseDataIter2(pIter);
  }
H
Hongze Cheng 已提交
135

H
Hongze Cheng 已提交
136 137
  tsdbDataFReaderClose(&pReader->pDataFReader);
}
H
Hongze Cheng 已提交
138

H
Hongze Cheng 已提交
139 140 141
static int32_t tsdbSnapReadNextRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
142

H
Hongze Cheng 已提交
143
  if (pReader->pIter) {
H
Hongze Cheng 已提交
144 145
    code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION |
                                                                        TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE,  // flag
H
Hongze Cheng 已提交
146 147 148
                                                                .sver = pReader->sver,
                                                                .ever = pReader->ever});
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
149

H
Hongze Cheng 已提交
150 151 152 153 154 155 156 157 158 159 160
    if (pReader->pIter->rowInfo.suid == 0 && pReader->pIter->rowInfo.uid == 0) {
      pReader->pIter = NULL;
    } else {
      SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt);
      if (pNode) {
        int32_t c = tsdbDataIterCmprFn(&pReader->pIter->rbtn, pNode);
        if (c > 0) {
          tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn);
          pReader->pIter = NULL;
        } else if (c == 0) {
          ASSERT(0);
H
Hongze Cheng 已提交
161
        }
H
Hongze Cheng 已提交
162 163
      }
    }
H
Hongze Cheng 已提交
164
  }
H
Hongze Cheng 已提交
165

H
Hongze Cheng 已提交
166 167 168 169 170
  if (pReader->pIter == NULL) {
    SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt);
    if (pNode) {
      tRBTreeDrop(&pReader->rbt, pNode);
      pReader->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
H
Hongze Cheng 已提交
171 172
    }
  }
H
Hongze Cheng 已提交
173

H
Hongze Cheng 已提交
174
  if (ppRowInfo) {
H
Hongze Cheng 已提交
175
    if (pReader->pIter) {
H
Hongze Cheng 已提交
176
      *ppRowInfo = &pReader->pIter->rowInfo;
H
Hongze Cheng 已提交
177
    } else {
H
Hongze Cheng 已提交
178
      *ppRowInfo = NULL;
H
Hongze Cheng 已提交
179 180
    }
  }
H
Hongze Cheng 已提交
181

H
Hongze Cheng 已提交
182 183 184 185
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
186 187 188
  return code;
}

H
Hongze Cheng 已提交
189
static int32_t tsdbSnapReadGetRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) {
H
Hongze Cheng 已提交
190
  if (pReader->pIter) {
H
Hongze Cheng 已提交
191 192
    *ppRowInfo = &pReader->pIter->rowInfo;
    return 0;
H
Hongze Cheng 已提交
193
  }
H
Hongze Cheng 已提交
194 195

  return tsdbSnapReadNextRow(pReader, ppRowInfo);
H
Hongze Cheng 已提交
196 197
}

H
Hongze Cheng 已提交
198 199 200
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
  int32_t code = 0;

201
  ASSERT(pReader->bData.nRow);
H
Hongze Cheng 已提交
202 203

  int32_t aBufN[5] = {0};
H
Hongze Cheng 已提交
204
  code = tCmprBlockData(&pReader->bData, NO_COMPRESSION, NULL, NULL, pReader->aBuf, aBufN);
H
Hongze Cheng 已提交
205 206 207 208 209 210 211 212 213 214
  if (code) goto _exit;

  int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
  *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
  if (*ppData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData;
K
kailixu 已提交
215
  pHdr->type = pReader->type;
H
Hongze Cheng 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
  pHdr->size = size;

  memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]);
  memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]);
  if (aBufN[1]) {
    memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]);
  }
  if (aBufN[0]) {
    memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]);
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
231
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** ppData) {
H
Hongze Cheng 已提交
232
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
233 234 235
  int32_t lino = 0;

  STsdb* pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
236

H
Hongze Cheng 已提交
237
  tBlockDataReset(&pReader->bData);
H
Hongze Cheng 已提交
238 239 240

  for (;;) {
    // start a new file read if need
H
Hongze Cheng 已提交
241
    if (pReader->pDataFReader == NULL) {
H
Hongze Cheng 已提交
242
      code = tsdbSnapReadFileDataStart(pReader);
H
add log  
Hongze Cheng 已提交
243
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
244 245 246 247
    }

    if (pReader->pDataFReader == NULL) break;

H
Hongze Cheng 已提交
248 249 250 251
    SRowInfo* pRowInfo;
    code = tsdbSnapReadGetRow(pReader, &pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
252
    if (pRowInfo == NULL) {
H
Hongze Cheng 已提交
253
      tsdbSnapReadFileDataEnd(pReader);
H
Hongze Cheng 已提交
254 255 256
      continue;
    }

H
Hongze Cheng 已提交
257
    code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, pRowInfo->suid, pRowInfo->uid, &pReader->skmTable);
H
add log  
Hongze Cheng 已提交
258
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260
    code = tBlockDataInit(&pReader->bData, (TABLEID*)pRowInfo, pReader->skmTable.pTSchema, NULL, 0);
H
add log  
Hongze Cheng 已提交
261
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
262

H
Hongze Cheng 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276
    do {
      if (!TABLE_SAME_SCHEMA(pReader->bData.suid, pReader->bData.uid, pRowInfo->suid, pRowInfo->uid)) break;

      if (pReader->bData.uid && pReader->bData.uid != pRowInfo->uid) {
        code = tRealloc((uint8_t**)&pReader->bData.aUid, sizeof(int64_t) * (pReader->bData.nRow + 1));
        TSDB_CHECK_CODE(code, lino, _exit);

        for (int32_t iRow = 0; iRow < pReader->bData.nRow; ++iRow) {
          pReader->bData.aUid[iRow] = pReader->bData.uid;
        }
        pReader->bData.uid = 0;
      }

      code = tBlockDataAppendRow(&pReader->bData, &pRowInfo->row, NULL, pRowInfo->uid);
H
add log  
Hongze Cheng 已提交
277
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
278

H
Hongze Cheng 已提交
279
      code = tsdbSnapReadNextRow(pReader, &pRowInfo);
H
add log  
Hongze Cheng 已提交
280
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
281

H
Hongze Cheng 已提交
282
      if (pReader->bData.nRow >= 81920) break;
H
Hongze Cheng 已提交
283
    } while (pRowInfo);
H
Hongze Cheng 已提交
284

H
Hongze Cheng 已提交
285 286 287 288
    ASSERT(pReader->bData.nRow > 0);

    break;
  }
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290
  if (pReader->bData.nRow > 0) {
H
Hongze Cheng 已提交
291 292
    ASSERT(pReader->bData.suid || pReader->bData.uid);

H
Hongze Cheng 已提交
293
    code = tsdbSnapCmprData(pReader, ppData);
H
add log  
Hongze Cheng 已提交
294
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
295 296
  }

H
add log  
Hongze Cheng 已提交
297 298
_exit:
  if (code) {
H
Hongze Cheng 已提交
299
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
add log  
Hongze Cheng 已提交
300
  }
H
Hongze Cheng 已提交
301 302 303
  return code;
}

H
Hongze Cheng 已提交
304
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData) {
H
add log  
Hongze Cheng 已提交
305 306 307
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
308 309 310 311
  int64_t size = sizeof(TABLEID);
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) {
    size += tPutDelData(NULL, taosArrayGet(pReader->aDelData, iDelData));
  }
H
Hongze Cheng 已提交
312

H
Hongze Cheng 已提交
313 314 315
  uint8_t* pData = (uint8_t*)taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
  if (pData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
add log  
Hongze Cheng 已提交
316
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
317
  }
H
Hongze Cheng 已提交
318

H
Hongze Cheng 已提交
319 320 321 322 323 324
  SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
  pHdr->type = SNAP_DATA_DEL;
  pHdr->size = size;

  TABLEID* pId = (TABLEID*)(pData + sizeof(SSnapDataHdr));
  *pId = pReader->tbid;
H
Hongze Cheng 已提交
325

H
Hongze Cheng 已提交
326 327 328
  size = sizeof(SSnapDataHdr) + sizeof(TABLEID);
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) {
    size += tPutDelData(pData + size, taosArrayGet(pReader->aDelData, iDelData));
H
Hongze Cheng 已提交
329 330
  }

H
Hongze Cheng 已提交
331 332 333 334 335 336 337
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  *ppData = pData;
  return code;
}
H
Hongze Cheng 已提交
338

H
Hongze Cheng 已提交
339 340 341 342 343
static void tsdbSnapReadGetTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) {
  if (pReader->pTIter == NULL || (pReader->pTIter->delInfo.suid == 0 && pReader->pTIter->delInfo.uid == 0)) {
    *ppDelInfo = NULL;
  } else {
    *ppDelInfo = &pReader->pTIter->delInfo;
H
Hongze Cheng 已提交
344 345
  }
}
H
Hongze Cheng 已提交
346

H
Hongze Cheng 已提交
347
static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) {
H
Hongze Cheng 已提交
348 349
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
350

H
Hongze Cheng 已提交
351
  code = tsdbDataIterNext2(
H
Hongze Cheng 已提交
352 353 354
      pReader->pTIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE,
                                          .sver = pReader->sver,
                                          .ever = pReader->ever});
H
Hongze Cheng 已提交
355 356 357 358
  TSDB_CHECK_CODE(code, lino, _exit);

  if (ppDelInfo) {
    tsdbSnapReadGetTombData(pReader, ppDelInfo);
H
Hongze Cheng 已提交
359 360
  }

H
add log  
Hongze Cheng 已提交
361 362
_exit:
  if (code) {
H
Hongze Cheng 已提交
363
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
add log  
Hongze Cheng 已提交
364
  }
H
Hongze Cheng 已提交
365 366 367
  return code;
}

H
Hongze Cheng 已提交
368
static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) {
H
add log  
Hongze Cheng 已提交
369 370 371
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
372
  STsdb* pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
373

H
Hongze Cheng 已提交
374
  // open tombstone data iter if need
H
Hongze Cheng 已提交
375
  if (pReader->pDelFReader == NULL) {
H
Hongze Cheng 已提交
376
    if (pReader->fs.pDelFile == NULL) goto _exit;
H
Hongze Cheng 已提交
377 378

    // open
H
Hongze Cheng 已提交
379
    code = tsdbDelFReaderOpen(&pReader->pDelFReader, pReader->fs.pDelFile, pTsdb);
H
add log  
Hongze Cheng 已提交
380
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
381

H
Hongze Cheng 已提交
382
    code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter);
H
add log  
Hongze Cheng 已提交
383
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
384

H
Hongze Cheng 已提交
385 386 387 388
    if (pReader->pTIter) {
      code = tsdbSnapReadNextTombData(pReader, NULL);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
389 390
  }

H
Hongze Cheng 已提交
391 392 393
  // loop to get tombstone data
  SDelInfo* pDelInfo;
  tsdbSnapReadGetTombData(pReader, &pDelInfo);
H
Hongze Cheng 已提交
394

H
Hongze Cheng 已提交
395
  if (pDelInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
396

H
Hongze Cheng 已提交
397
  pReader->tbid = *(TABLEID*)pDelInfo;
H
Hongze Cheng 已提交
398

H
Hongze Cheng 已提交
399 400 401 402
  if (pReader->aDelData) {
    taosArrayClear(pReader->aDelData);
  } else if ((pReader->aDelData = taosArrayInit(16, sizeof(SDelData))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
add log  
Hongze Cheng 已提交
403
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
404
  }
H
Hongze Cheng 已提交
405

H
Hongze Cheng 已提交
406
  while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) {
H
Hongze Cheng 已提交
407
    if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) == NULL) {
H
Hongze Cheng 已提交
408
      code = TSDB_CODE_OUT_OF_MEMORY;
H
add log  
Hongze Cheng 已提交
409
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
410
    }
H
Hongze Cheng 已提交
411

H
Hongze Cheng 已提交
412 413 414
    code = tsdbSnapReadNextTombData(pReader, &pDelInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
415

H
Hongze Cheng 已提交
416
  // encode tombstone data
H
Hongze Cheng 已提交
417 418 419
  if (taosArrayGetSize(pReader->aDelData) > 0) {
    code = tsdbSnapCmprTombData(pReader, ppData);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
420
  }
H
Hongze Cheng 已提交
421 422

_exit:
H
add log  
Hongze Cheng 已提交
423
  if (code) {
H
Hongze Cheng 已提交
424 425 426
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
add log  
Hongze Cheng 已提交
427
  }
H
Hongze Cheng 已提交
428 429
  return code;
}
H
more  
Hongze Cheng 已提交
430

C
Cary Xu 已提交
431
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
432 433
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
434

H
more  
Hongze Cheng 已提交
435
  // alloc
H
Hongze Cheng 已提交
436
  STsdbSnapReader* pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
H
more  
Hongze Cheng 已提交
437 438
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
439
    TSDB_CHECK_CODE(code, lino, _exit);
H
more  
Hongze Cheng 已提交
440 441 442 443
  }
  pReader->pTsdb = pTsdb;
  pReader->sver = sver;
  pReader->ever = ever;
C
Cary Xu 已提交
444
  pReader->type = type;
H
more  
Hongze Cheng 已提交
445

H
Hongze Cheng 已提交
446
  taosThreadRwlockRdlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
447 448 449
  code = tsdbFSRef(pTsdb, &pReader->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
450
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
451
  }
H
Hongze Cheng 已提交
452
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
453

H
Hongze Cheng 已提交
454
  // init
H
Hongze Cheng 已提交
455
  pReader->fid = INT32_MIN;
H
Hongze Cheng 已提交
456 457

  code = tBlockDataCreate(&pReader->bData);
H
Hongze Cheng 已提交
458
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
459

H
Hongze Cheng 已提交
460 461
_exit:
  if (code) {
H
Hongze Cheng 已提交
462 463
    tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode),
              __func__, lino, tstrerror(code), sver, ever, type);
H
Hongze Cheng 已提交
464
    if (pReader) {
H
Hongze Cheng 已提交
465
      tBlockDataDestroy(&pReader->bData);
H
Hongze Cheng 已提交
466
      tsdbFSUnref(pTsdb, &pReader->fs);
H
Hongze Cheng 已提交
467
      taosMemoryFree(pReader);
H
Hongze Cheng 已提交
468
      pReader = NULL;
H
Hongze Cheng 已提交
469 470
    }
  } else {
H
Hongze Cheng 已提交
471 472
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), __func__, sver, ever,
             type);
H
Hongze Cheng 已提交
473
  }
H
Hongze Cheng 已提交
474
  *ppReader = pReader;
H
more  
Hongze Cheng 已提交
475
  return code;
H
Hongze Cheng 已提交
476 477
}

H
Hongze Cheng 已提交
478
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
H
Hongze Cheng 已提交
479 480
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
481

H
Hongze Cheng 已提交
482
  STsdbSnapReader* pReader = *ppReader;
H
Hongze Cheng 已提交
483
  STsdb*           pTsdb = pReader->pTsdb;
H
Hongze Cheng 已提交
484

H
Hongze Cheng 已提交
485 486 487 488
  // tombstone
  if (pReader->pTIter) {
    tsdbCloseDataIter2(pReader->pTIter);
    pReader->pTIter = NULL;
H
Hongze Cheng 已提交
489
  }
H
Hongze Cheng 已提交
490 491
  if (pReader->pDelFReader) {
    tsdbDelFReaderClose(&pReader->pDelFReader);
H
Hongze Cheng 已提交
492
  }
H
Hongze Cheng 已提交
493
  taosArrayDestroy(pReader->aDelData);
H
Hongze Cheng 已提交
494

H
Hongze Cheng 已提交
495 496 497 498 499 500 501 502 503
  // timeseries
  while (pReader->iterList) {
    STsdbDataIter2* pIter = pReader->iterList;
    pReader->iterList = pIter->next;
    tsdbCloseDataIter2(pIter);
  }
  if (pReader->pDataFReader) {
    tsdbDataFReaderClose(&pReader->pDataFReader);
  }
H
Hongze Cheng 已提交
504
  tBlockDataDestroy(&pReader->bData);
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506 507
  // other
  tDestroyTSchema(pReader->skmTable.pTSchema);
H
Hongze Cheng 已提交
508
  tsdbFSUnref(pReader->pTsdb, &pReader->fs);
H
Hongze Cheng 已提交
509 510 511
  for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) {
    tFree(pReader->aBuf[iBuf]);
  }
H
Hongze Cheng 已提交
512
  taosMemoryFree(pReader);
H
Hongze Cheng 已提交
513 514 515

_exit:
  if (code) {
H
Hongze Cheng 已提交
516
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
517
  } else {
H
Hongze Cheng 已提交
518
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
519
  }
H
Hongze Cheng 已提交
520
  *ppReader = NULL;
H
Hongze Cheng 已提交
521 522 523 524
  return code;
}

int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) {
H
more  
Hongze Cheng 已提交
525
  int32_t code = 0;
H
add log  
Hongze Cheng 已提交
526
  int32_t lino = 0;
H
Hongze Cheng 已提交
527

H
Hongze Cheng 已提交
528 529
  *ppData = NULL;

H
Hongze Cheng 已提交
530
  // read data file
H
Hongze Cheng 已提交
531
  if (!pReader->dataDone) {
H
Hongze Cheng 已提交
532
    code = tsdbSnapReadTimeSeriesData(pReader, ppData);
H
add log  
Hongze Cheng 已提交
533 534 535
    TSDB_CHECK_CODE(code, lino, _exit);
    if (*ppData) {
      goto _exit;
H
Hongze Cheng 已提交
536
    } else {
H
add log  
Hongze Cheng 已提交
537
      pReader->dataDone = 1;
H
Hongze Cheng 已提交
538 539
    }
  }
H
Hongze Cheng 已提交
540 541

  // read del file
H
Hongze Cheng 已提交
542
  if (!pReader->delDone) {
H
Hongze Cheng 已提交
543
    code = tsdbSnapReadTombData(pReader, ppData);
H
add log  
Hongze Cheng 已提交
544 545 546
    TSDB_CHECK_CODE(code, lino, _exit);
    if (*ppData) {
      goto _exit;
H
Hongze Cheng 已提交
547
    } else {
H
add log  
Hongze Cheng 已提交
548
      pReader->delDone = 1;
H
Hongze Cheng 已提交
549 550
    }
  }
H
Hongze Cheng 已提交
551

H
Hongze Cheng 已提交
552
_exit:
H
add log  
Hongze Cheng 已提交
553
  if (code) {
H
Hongze Cheng 已提交
554
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
add log  
Hongze Cheng 已提交
555
  } else {
H
Hongze Cheng 已提交
556
    tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__);
H
add log  
Hongze Cheng 已提交
557
  }
H
more  
Hongze Cheng 已提交
558 559 560
  return code;
}

H
Hongze Cheng 已提交
561
// STsdbSnapWriter ========================================
H
Hongze Cheng 已提交
562
struct STsdbSnapWriter {
H
Hongze Cheng 已提交
563 564 565
  STsdb*   pTsdb;
  int64_t  sver;
  int64_t  ever;
H
Hongze Cheng 已提交
566 567 568 569 570 571
  int32_t  minutes;
  int8_t   precision;
  int32_t  minRow;
  int32_t  maxRow;
  int8_t   cmprAlg;
  int64_t  commitID;
H
Hongze Cheng 已提交
572
  uint8_t* aBuf[5];
H
Hongze Cheng 已提交
573

H
Hongze Cheng 已提交
574
  STsdbFS fs;
H
Hongze Cheng 已提交
575
  TABLEID tbid;
H
Hongze Cheng 已提交
576

H
Hongze Cheng 已提交
577 578 579 580 581 582 583 584 585 586
  // time-series data
  SBlockData inData;

  int32_t  fid;
  SSkmInfo skmTable;

  /* reader */
  SDataFReader*   pDataFReader;
  STsdbDataIter2* iterList;
  STsdbDataIter2* pDIter;
H
Hongze Cheng 已提交
587
  STsdbDataIter2* pSIter;
H
Hongze Cheng 已提交
588 589 590 591 592 593 594 595 596 597 598
  SRBTree         rbt;  // SRBTree<STsdbDataIter2>

  /* writer */
  SDataFWriter* pDataFWriter;
  SArray*       aBlockIdx;
  SMapData      mDataBlk;  // SMapData<SDataBlk>
  SArray*       aSttBlk;   // SArray<SSttBlk>
  SBlockData    bData;
  SBlockData    sData;

  // tombstone data
H
Hongze Cheng 已提交
599 600 601 602 603
  /* reader */
  SDelFReader*    pDelFReader;
  STsdbDataIter2* pTIter;

  /* writer */
H
Hongze Cheng 已提交
604
  SDelFWriter* pDelFWriter;
H
Hongze Cheng 已提交
605
  SArray*      aDelIdx;
H
Hongze Cheng 已提交
606
  SArray*      aDelData;
H
Hongze Cheng 已提交
607 608
};

H
Hongze Cheng 已提交
609
// SNAP_DATA_TSDB
H
Hongze Cheng 已提交
610
static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
H
Hongze Cheng 已提交
611
  int32_t code = 0;
H
Hongze Cheng 已提交
612 613
  int32_t lino = 0;

H
Hongze Cheng 已提交
614 615 616 617 618 619
  if (pId) {
    pWriter->tbid = *pId;
  } else {
    pWriter->tbid = (TABLEID){INT64_MAX, INT64_MAX};
  }

H
Hongze Cheng 已提交
620 621
  if (pWriter->pDIter) {
    STsdbDataIter2* pIter = pWriter->pDIter;
H
Hongze Cheng 已提交
622

H
Hongze Cheng 已提交
623
    // assert last table data end
H
Hongze Cheng 已提交
624 625 626
    ASSERT(pIter->dIter.iRow >= pIter->dIter.bData.nRow);
    ASSERT(pIter->dIter.iDataBlk >= pIter->dIter.mDataBlk.nItem);

H
Hongze Cheng 已提交
627
    for (;;) {
H
Hongze Cheng 已提交
628
      if (pIter->dIter.iBlockIdx >= taosArrayGetSize(pIter->dIter.aBlockIdx)) {
H
Hongze Cheng 已提交
629 630 631 632
        pWriter->pDIter = NULL;
        break;
      }

H
Hongze Cheng 已提交
633
      SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
H
Hongze Cheng 已提交
634

H
Hongze Cheng 已提交
635
      int32_t c = tTABLEIDCmprFn(pBlockIdx, &pWriter->tbid);
H
Hongze Cheng 已提交
636 637 638 639 640 641
      if (c < 0) {
        code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
        TSDB_CHECK_CODE(code, lino, _exit);

        SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
        if (pNewBlockIdx == NULL) {
H
Hongze Cheng 已提交
642
          code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
643 644 645 646 647 648
          TSDB_CHECK_CODE(code, lino, _exit);
        }

        pNewBlockIdx->suid = pBlockIdx->suid;
        pNewBlockIdx->uid = pBlockIdx->uid;

H
Hongze Cheng 已提交
649
        code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pNewBlockIdx);
H
Hongze Cheng 已提交
650 651
        TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
652 653
        pIter->dIter.iBlockIdx++;
      } else if (c == 0) {
H
Hongze Cheng 已提交
654 655 656
        code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
        TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
657
        pIter->dIter.iDataBlk = 0;
H
Hongze Cheng 已提交
658
        pIter->dIter.iBlockIdx++;
H
Hongze Cheng 已提交
659 660 661

        break;
      } else {
H
Hongze Cheng 已提交
662
        pIter->dIter.iDataBlk = pIter->dIter.mDataBlk.nItem;
H
Hongze Cheng 已提交
663 664 665 666
        break;
      }
    }
  }
H
Hongze Cheng 已提交
667

H
Hongze Cheng 已提交
668
  if (pId) {
H
Hongze Cheng 已提交
669 670
    code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
671

H
Hongze Cheng 已提交
672
    tMapDataReset(&pWriter->mDataBlk);
H
Hongze Cheng 已提交
673

H
Hongze Cheng 已提交
674 675
    code = tBlockDataInit(&pWriter->bData, pId, pWriter->skmTable.pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
676
  }
H
Hongze Cheng 已提交
677

H
Hongze Cheng 已提交
678 679 680 681 682
  if (!TABLE_SAME_SCHEMA(pWriter->tbid.suid, pWriter->tbid.uid, pWriter->sData.suid, pWriter->sData.uid)) {
    if ((pWriter->sData.nRow > 0)) {
      code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
683

H
Hongze Cheng 已提交
684 685 686 687 688
    if (pId) {
      TABLEID id = {.suid = pWriter->tbid.suid, .uid = pWriter->tbid.suid ? 0 : pWriter->tbid.uid};
      code = tBlockDataInit(&pWriter->sData, &id, pWriter->skmTable.pTSchema, NULL, 0);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
689
  }
H
Hongze Cheng 已提交
690

H
Hongze Cheng 已提交
691 692 693
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
694
  } else {
H
Hongze Cheng 已提交
695 696
    tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
              pWriter->tbid.suid, pWriter->tbid.uid);
H
Hongze Cheng 已提交
697
  }
H
Hongze Cheng 已提交
698 699 700
  return code;
}

H
Hongze Cheng 已提交
701 702 703 704 705 706 707 708 709 710
static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->skmTable.pTSchema, pWriter->tbid.uid);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pWriter->bData.nRow >= pWriter->maxRow) {
    code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
711 712 713
  }

_exit:
H
Hongze Cheng 已提交
714 715 716
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
717 718 719
  return code;
}

H
Hongze Cheng 已提交
720
static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
H
Hongze Cheng 已提交
721
  int32_t code = 0;
H
Hongze Cheng 已提交
722 723 724
  int32_t lino = 0;

  TSDBKEY inKey = pRow ? TSDBROW_KEY(pRow) : TSDBKEY_MAX;
H
Hongze Cheng 已提交
725

H
Hongze Cheng 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
  if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
                                  pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
    goto _write_row;
  } else {
    for (;;) {
      while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) {
        TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow);

        int32_t c = tsdbKeyCmprFn(&inKey, &TSDBROW_KEY(&row));
        if (c < 0) {
          goto _write_row;
        } else if (c > 0) {
          code = tsdbSnapWriteTableRowImpl(pWriter, &row);
          TSDB_CHECK_CODE(code, lino, _exit);

          pWriter->pDIter->dIter.iRow++;
        } else {
          ASSERT(0);
        }
      }
H
Hongze Cheng 已提交
746

H
Hongze Cheng 已提交
747 748
      for (;;) {
        if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) goto _write_row;
H
Hongze Cheng 已提交
749

H
Hongze Cheng 已提交
750 751 752 753 754 755 756 757 758 759 760 761 762
        // FIXME: Here can be slow, use array instead
        SDataBlk dataBlk;
        tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);

        int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = inKey, .maxKey = inKey});
        if (c > 0) {
          goto _write_row;
        } else if (c < 0) {
          if (pWriter->bData.nRow > 0) {
            code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
            TSDB_CHECK_CODE(code, lino, _exit);
          }

763
          tMapDataPutItem(&pWriter->mDataBlk, &dataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
764 765 766 767 768 769 770 771 772 773
          pWriter->pDIter->dIter.iDataBlk++;
        } else {
          code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
          TSDB_CHECK_CODE(code, lino, _exit);

          pWriter->pDIter->dIter.iRow = 0;
          pWriter->pDIter->dIter.iDataBlk++;
          break;
        }
      }
H
Hongze Cheng 已提交
774
    }
H
Hongze Cheng 已提交
775
  }
H
Hongze Cheng 已提交
776

H
Hongze Cheng 已提交
777 778 779 780
_write_row:
  if (pRow) {
    code = tsdbSnapWriteTableRowImpl(pWriter, pRow);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
781 782
  }

H
Hongze Cheng 已提交
783
_exit:
H
Hongze Cheng 已提交
784 785 786
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
787 788 789
  return code;
}

H
Hongze Cheng 已提交
790 791
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
792
  int32_t lino = 0;
H
Hongze Cheng 已提交
793

H
Hongze Cheng 已提交
794 795 796
  // write a NULL row to end current table data write
  code = tsdbSnapWriteTableRow(pWriter, NULL);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
797

H
Hongze Cheng 已提交
798 799
  if (pWriter->bData.nRow > 0) {
    if (pWriter->bData.nRow < pWriter->minRow) {
H
Hongze Cheng 已提交
800
      ASSERT(TABLE_SAME_SCHEMA(pWriter->sData.suid, pWriter->sData.uid, pWriter->tbid.suid, pWriter->tbid.uid));
H
Hongze Cheng 已提交
801
      for (int32_t iRow = 0; iRow < pWriter->bData.nRow; iRow++) {
H
Hongze Cheng 已提交
802 803
        code =
            tBlockDataAppendRow(&pWriter->sData, &tsdbRowFromBlockData(&pWriter->bData, iRow), NULL, pWriter->tbid.uid);
H
Hongze Cheng 已提交
804
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
805

H
Hongze Cheng 已提交
806 807 808 809
        if (pWriter->sData.nRow >= pWriter->maxRow) {
          code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg);
          TSDB_CHECK_CODE(code, lino, _exit);
        }
H
Hongze Cheng 已提交
810
      }
H
Hongze Cheng 已提交
811

H
Hongze Cheng 已提交
812
      tBlockDataClear(&pWriter->bData);
H
Hongze Cheng 已提交
813 814 815
    } else {
      code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
816 817
    }
  }
H
Hongze Cheng 已提交
818

H
Hongze Cheng 已提交
819 820 821
  if (pWriter->mDataBlk.nItem) {
    SBlockIdx* pBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
    if (pBlockIdx == NULL) {
H
Hongze Cheng 已提交
822
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
823
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
824
    }
H
Hongze Cheng 已提交
825

H
Hongze Cheng 已提交
826 827
    pBlockIdx->suid = pWriter->tbid.suid;
    pBlockIdx->uid = pWriter->tbid.uid;
H
Hongze Cheng 已提交
828

H
Hongze Cheng 已提交
829 830 831
    code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
832

H
Hongze Cheng 已提交
833 834 835 836
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
837 838 839
  return code;
}

H
Hongze Cheng 已提交
840
static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) {
H
Hongze Cheng 已提交
841
  int32_t code = 0;
H
Hongze Cheng 已提交
842
  int32_t lino = 0;
H
Hongze Cheng 已提交
843

H
Hongze Cheng 已提交
844
  ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid);
H
Hongze Cheng 已提交
845

H
Hongze Cheng 已提交
846
  STsdb* pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
847 848

  pWriter->fid = fid;
H
Hongze Cheng 已提交
849
  pWriter->tbid = (TABLEID){0};
H
Hongze Cheng 已提交
850 851
  SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);

H
Hongze Cheng 已提交
852 853 854 855
  // open reader
  pWriter->pDataFReader = NULL;
  pWriter->iterList = NULL;
  pWriter->pDIter = NULL;
H
Hongze Cheng 已提交
856
  pWriter->pSIter = NULL;
H
Hongze Cheng 已提交
857
  tRBTreeCreate(&pWriter->rbt, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
858
  if (pSet) {
H
Hongze Cheng 已提交
859 860
    code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
861

H
Hongze Cheng 已提交
862 863 864 865 866 867
    code = tsdbOpenDataFileDataIter(pWriter->pDataFReader, &pWriter->pDIter);
    TSDB_CHECK_CODE(code, lino, _exit);
    if (pWriter->pDIter) {
      pWriter->pDIter->next = pWriter->iterList;
      pWriter->iterList = pWriter->pDIter;
    }
H
Hongze Cheng 已提交
868

H
Hongze Cheng 已提交
869
    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
870
      code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pSIter);
H
Hongze Cheng 已提交
871
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
872

H
Hongze Cheng 已提交
873
      if (pWriter->pSIter) {
H
Hongze Cheng 已提交
874
        code = tsdbDataIterNext2(pWriter->pSIter, NULL);
H
Hongze Cheng 已提交
875
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
876

H
Hongze Cheng 已提交
877
        // add to tree
H
Hongze Cheng 已提交
878
        tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn);
H
Hongze Cheng 已提交
879

H
Hongze Cheng 已提交
880
        // add to list
H
Hongze Cheng 已提交
881 882
        pWriter->pSIter->next = pWriter->iterList;
        pWriter->iterList = pWriter->pSIter;
H
Hongze Cheng 已提交
883 884 885
      }
    }

H
Hongze Cheng 已提交
886
    pWriter->pSIter = NULL;
H
Hongze Cheng 已提交
887 888 889 890 891 892
  }

  // open writer
  SDiskID diskId;
  if (pSet) {
    diskId = pSet->diskId;
H
Hongze Cheng 已提交
893
  } else {
H
Hongze Cheng 已提交
894 895 896 897 898 899 900 901 902 903 904 905
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0 /*TODO*/, &diskId);
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, diskId);
  }
  SDFileSet wSet = {.diskId = diskId,
                    .fid = fid,
                    .pHeadF = &(SHeadFile){.commitID = pWriter->commitID},
                    .pDataF = (pSet) ? pSet->pDataF : &(SDataFile){.commitID = pWriter->commitID},
                    .pSmaF = (pSet) ? pSet->pSmaF : &(SSmaFile){.commitID = pWriter->commitID},
                    .nSttF = 1,
                    .aSttF = {&(SSttFile){.commitID = pWriter->commitID}}};
  code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
906

H
Hongze Cheng 已提交
907 908 909 910 911
  if (pWriter->aBlockIdx) {
    taosArrayClear(pWriter->aBlockIdx);
  } else if ((pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
912
  }
H
Hongze Cheng 已提交
913

H
Hongze Cheng 已提交
914
  tMapDataReset(&pWriter->mDataBlk);
H
Hongze Cheng 已提交
915

H
Hongze Cheng 已提交
916 917 918 919 920
  if (pWriter->aSttBlk) {
    taosArrayClear(pWriter->aSttBlk);
  } else if ((pWriter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
921 922
  }

H
Hongze Cheng 已提交
923
  tBlockDataReset(&pWriter->bData);
H
Hongze Cheng 已提交
924
  tBlockDataReset(&pWriter->sData);
H
Hongze Cheng 已提交
925

H
Hongze Cheng 已提交
926 927 928 929 930 931 932
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code),
              fid);
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pTsdb->pVnode), __func__, fid);
  }
H
Hongze Cheng 已提交
933 934 935
  return code;
}

H
Hongze Cheng 已提交
936 937 938 939 940 941
static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;

  // switch to new table if need
  if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) {
H
Hongze Cheng 已提交
942
    if (pWriter->tbid.uid) {
H
Hongze Cheng 已提交
943 944
      code = tsdbSnapWriteTableDataEnd(pWriter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
945
    }
H
Hongze Cheng 已提交
946 947 948

    code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
949 950
  }

H
Hongze Cheng 已提交
951
  if (pRowInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
952

H
Hongze Cheng 已提交
953 954
  code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
955

H
Hongze Cheng 已提交
956 957 958 959
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
960 961 962
  return code;
}

H
Hongze Cheng 已提交
963
static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
H
Hongze Cheng 已提交
964
  int32_t code = 0;
H
Hongze Cheng 已提交
965
  int32_t lino = 0;
H
Hongze Cheng 已提交
966

H
Hongze Cheng 已提交
967
  if (pWriter->pSIter) {
H
Hongze Cheng 已提交
968
    code = tsdbDataIterNext2(pWriter->pSIter, NULL);
H
Hongze Cheng 已提交
969
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
970

H
Hongze Cheng 已提交
971 972
    if (pWriter->pSIter->rowInfo.suid == 0 && pWriter->pSIter->rowInfo.uid == 0) {
      pWriter->pSIter = NULL;
H
Hongze Cheng 已提交
973 974 975
    } else {
      SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
      if (pNode) {
H
Hongze Cheng 已提交
976
        int32_t c = tsdbDataIterCmprFn(&pWriter->pSIter->rbtn, pNode);
H
Hongze Cheng 已提交
977
        if (c > 0) {
H
Hongze Cheng 已提交
978 979
          tRBTreePut(&pWriter->rbt, &pWriter->pSIter->rbtn);
          pWriter->pSIter = NULL;
H
Hongze Cheng 已提交
980 981 982 983 984
        } else if (c == 0) {
          ASSERT(0);
        }
      }
    }
H
Hongze Cheng 已提交
985
  }
H
Hongze Cheng 已提交
986

H
Hongze Cheng 已提交
987
  if (pWriter->pSIter == NULL) {
H
Hongze Cheng 已提交
988 989 990
    SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
    if (pNode) {
      tRBTreeDrop(&pWriter->rbt, pNode);
H
Hongze Cheng 已提交
991
      pWriter->pSIter = TSDB_RBTN_TO_DATA_ITER(pNode);
H
Hongze Cheng 已提交
992
    }
H
Hongze Cheng 已提交
993
  }
H
Hongze Cheng 已提交
994

H
Hongze Cheng 已提交
995
  if (ppRowInfo) {
H
Hongze Cheng 已提交
996 997
    if (pWriter->pSIter) {
      *ppRowInfo = &pWriter->pSIter->rowInfo;
H
Hongze Cheng 已提交
998 999 1000
    } else {
      *ppRowInfo = NULL;
    }
H
Hongze Cheng 已提交
1001
  }
H
Hongze Cheng 已提交
1002

H
Hongze Cheng 已提交
1003 1004 1005
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1006
  }
H
Hongze Cheng 已提交
1007 1008 1009
  return code;
}

H
Hongze Cheng 已提交
1010
static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
H
Hongze Cheng 已提交
1011
  int32_t code = 0;
H
Hongze Cheng 已提交
1012
  int32_t lino = 0;
H
Hongze Cheng 已提交
1013

H
Hongze Cheng 已提交
1014 1015
  if (pWriter->pSIter) {
    *ppRowInfo = &pWriter->pSIter->rowInfo;
H
Hongze Cheng 已提交
1016
    goto _exit;
H
Hongze Cheng 已提交
1017 1018
  }

H
Hongze Cheng 已提交
1019 1020
  code = tsdbSnapWriteNextRow(pWriter, ppRowInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1021

H
Hongze Cheng 已提交
1022
_exit:
H
Hongze Cheng 已提交
1023 1024 1025
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1026
  return code;
H
Hongze Cheng 已提交
1027 1028
}

H
Hongze Cheng 已提交
1029
static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
1030
  int32_t code = 0;
H
Hongze Cheng 已提交
1031
  int32_t lino = 0;
H
Hongze Cheng 已提交
1032

H
Hongze Cheng 已提交
1033
  ASSERT(pWriter->pDataFWriter);
H
Hongze Cheng 已提交
1034

H
Hongze Cheng 已提交
1035 1036 1037 1038 1039 1040 1041
  // consume remain data and end with a NULL table row
  SRowInfo* pRowInfo;
  code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
  for (;;) {
    code = tsdbSnapWriteTableData(pWriter, pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1042

H
Hongze Cheng 已提交
1043
    if (pRowInfo == NULL) break;
H
Hongze Cheng 已提交
1044

H
Hongze Cheng 已提交
1045 1046 1047
    code = tsdbSnapWriteNextRow(pWriter, &pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
1048

H
Hongze Cheng 已提交
1049 1050 1051
  // do file-level updates
  code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1052

H
Hongze Cheng 已提交
1053 1054
  code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1055

H
Hongze Cheng 已提交
1056 1057
  code = tsdbUpdateDFileSetHeader(pWriter->pDataFWriter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1058

H
Hongze Cheng 已提交
1059 1060
  code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1061

H
Hongze Cheng 已提交
1062 1063
  code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1064

H
Hongze Cheng 已提交
1065 1066 1067
  if (pWriter->pDataFReader) {
    code = tsdbDataFReaderClose(&pWriter->pDataFReader);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1068
  }
H
Hongze Cheng 已提交
1069

H
Hongze Cheng 已提交
1070 1071 1072 1073 1074 1075
  // clear sources
  while (pWriter->iterList) {
    STsdbDataIter2* pIter = pWriter->iterList;
    pWriter->iterList = pIter->next;
    tsdbCloseDataIter2(pIter);
  }
H
Hongze Cheng 已提交
1076

H
Hongze Cheng 已提交
1077 1078 1079 1080 1081 1082
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1083 1084 1085
  return code;
}

H
Hongze Cheng 已提交
1086
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
H
Hongze Cheng 已提交
1087
  int32_t code = 0;
H
Hongze Cheng 已提交
1088
  int32_t lino = 0;
H
Hongze Cheng 已提交
1089

H
Hongze Cheng 已提交
1090 1091
  code = tDecmprBlockData(pHdr->data, pHdr->size, &pWriter->inData, pWriter->aBuf);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1092

H
Hongze Cheng 已提交
1093
  ASSERT(pWriter->inData.nRow > 0);
H
Hongze Cheng 已提交
1094

H
Hongze Cheng 已提交
1095 1096 1097 1098
  // switch to new data file if need
  int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision);
  if (pWriter->fid != fid) {
    if (pWriter->pDataFWriter) {
H
Hongze Cheng 已提交
1099
      code = tsdbSnapWriteFileDataEnd(pWriter);
H
Hongze Cheng 已提交
1100
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1101 1102
    }

H
Hongze Cheng 已提交
1103
    code = tsdbSnapWriteFileDataStart(pWriter, fid);
H
Hongze Cheng 已提交
1104
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1105 1106
  }

H
Hongze Cheng 已提交
1107 1108 1109 1110
  // loop write each row
  SRowInfo* pRowInfo;
  code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1111
  for (int32_t iRow = 0; iRow < pWriter->inData.nRow; ++iRow) {
H
Hongze Cheng 已提交
1112 1113 1114
    SRowInfo rInfo = {.suid = pWriter->inData.suid,
                      .uid = pWriter->inData.uid ? pWriter->inData.uid : pWriter->inData.aUid[iRow],
                      .row = tsdbRowFromBlockData(&pWriter->inData, iRow)};
H
Hongze Cheng 已提交
1115

H
Hongze Cheng 已提交
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
    for (;;) {
      if (pRowInfo == NULL) {
        code = tsdbSnapWriteTableData(pWriter, &rInfo);
        TSDB_CHECK_CODE(code, lino, _exit);
        break;
      } else {
        int32_t c = tRowInfoCmprFn(&rInfo, pRowInfo);
        if (c < 0) {
          code = tsdbSnapWriteTableData(pWriter, &rInfo);
          TSDB_CHECK_CODE(code, lino, _exit);
          break;
        } else if (c > 0) {
          code = tsdbSnapWriteTableData(pWriter, pRowInfo);
          TSDB_CHECK_CODE(code, lino, _exit);

          code = tsdbSnapWriteNextRow(pWriter, &pRowInfo);
          TSDB_CHECK_CODE(code, lino, _exit);
        } else {
          ASSERT(0);
        }
      }
    }
H
Hongze Cheng 已提交
1138 1139
  }

H
Hongze Cheng 已提交
1140
_exit:
H
Hongze Cheng 已提交
1141 1142 1143 1144 1145 1146
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->pTsdb->pVnode), __func__,
              pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow);
  }
H
Hongze Cheng 已提交
1147 1148 1149 1150
  return code;
}

// SNAP_DATA_DEL
H
Hongze Cheng 已提交
1151
static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
H
Hongze Cheng 已提交
1152
  int32_t code = 0;
H
Hongze Cheng 已提交
1153
  int32_t lino = 0;
H
Hongze Cheng 已提交
1154

H
Hongze Cheng 已提交
1155 1156 1157 1158
  if (pId) {
    pWriter->tbid = *pId;
  } else {
    pWriter->tbid = (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX};
H
Hongze Cheng 已提交
1159
  }
H
Hongze Cheng 已提交
1160

H
Hongze Cheng 已提交
1161
  taosArrayClear(pWriter->aDelData);
H
Hongze Cheng 已提交
1162

H
Hongze Cheng 已提交
1163 1164
  if (pWriter->pTIter) {
    while (pWriter->pTIter->tIter.iDelIdx < taosArrayGetSize(pWriter->pTIter->tIter.aDelIdx)) {
H
Hongze Cheng 已提交
1165
      SDelIdx* pDelIdx = taosArrayGet(pWriter->pTIter->tIter.aDelIdx, pWriter->pTIter->tIter.iDelIdx);
H
Hongze Cheng 已提交
1166

H
Hongze Cheng 已提交
1167 1168
      int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid);
      if (c < 0) {
H
Haojun Liao 已提交
1169
        code = tsdbReadDelDatav1(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData, INT64_MAX);
H
Hongze Cheng 已提交
1170
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1171

H
Hongze Cheng 已提交
1172
        SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1);
H
Hongze Cheng 已提交
1173 1174 1175 1176
        if (pDelIdxNew == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          TSDB_CHECK_CODE(code, lino, _exit);
        }
H
Hongze Cheng 已提交
1177

H
Hongze Cheng 已提交
1178 1179
        pDelIdxNew->suid = pDelIdx->suid;
        pDelIdxNew->uid = pDelIdx->uid;
H
Hongze Cheng 已提交
1180

H
Hongze Cheng 已提交
1181 1182
        code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->pTIter->tIter.aDelData, pDelIdxNew);
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1183

H
Hongze Cheng 已提交
1184 1185
        pWriter->pTIter->tIter.iDelIdx++;
      } else if (c == 0) {
H
Haojun Liao 已提交
1186
        code = tsdbReadDelDatav1(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, INT64_MAX);
H
Hongze Cheng 已提交
1187
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1188

H
Hongze Cheng 已提交
1189 1190 1191 1192
        pWriter->pTIter->tIter.iDelIdx++;
        break;
      } else {
        break;
H
Hongze Cheng 已提交
1193
      }
H
Hongze Cheng 已提交
1194 1195 1196
    }
  }

H
Hongze Cheng 已提交
1197
_exit:
H
Hongze Cheng 已提交
1198 1199 1200
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
H
Hongze Cheng 已提交
1201 1202
    tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
              pWriter->tbid.suid, pWriter->tbid.uid);
H
Hongze Cheng 已提交
1203
  }
H
Hongze Cheng 已提交
1204 1205 1206
  return code;
}

H
Hongze Cheng 已提交
1207
static int32_t tsdbSnapWriteDelTableDataEnd(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
1208
  int32_t code = 0;
H
Hongze Cheng 已提交
1209
  int32_t lino = 0;
H
Hongze Cheng 已提交
1210

H
Hongze Cheng 已提交
1211 1212 1213
  if (taosArrayGetSize(pWriter->aDelData) > 0) {
    SDelIdx* pDelIdx = taosArrayReserve(pWriter->aDelIdx, 1);
    if (pDelIdx == NULL) {
H
Hongze Cheng 已提交
1214
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1215
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1216 1217
    }

H
Hongze Cheng 已提交
1218 1219
    pDelIdx->suid = pWriter->tbid.suid;
    pDelIdx->uid = pWriter->tbid.uid;
H
Hongze Cheng 已提交
1220

H
Hongze Cheng 已提交
1221 1222
    code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, pDelIdx);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1223 1224 1225
  }

_exit:
H
Hongze Cheng 已提交
1226 1227 1228 1229 1230
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbTrace("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1231 1232 1233
  return code;
}

H
Hongze Cheng 已提交
1234
static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId, uint8_t* pData, int64_t size) {
H
Hongze Cheng 已提交
1235
  int32_t code = 0;
H
Hongze Cheng 已提交
1236
  int32_t lino = 0;
H
Hongze Cheng 已提交
1237

H
Hongze Cheng 已提交
1238 1239 1240 1241
  if (pId == NULL || pId->uid != pWriter->tbid.uid) {
    if (pWriter->tbid.uid) {
      code = tsdbSnapWriteDelTableDataEnd(pWriter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1242 1243
    }

H
Hongze Cheng 已提交
1244 1245
    code = tsdbSnapWriteDelTableDataStart(pWriter, pId);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1246 1247
  }

H
Hongze Cheng 已提交
1248
  if (pId == NULL) goto _exit;
H
Hongze Cheng 已提交
1249

H
Hongze Cheng 已提交
1250 1251 1252 1253
  int64_t n = 0;
  while (n < size) {
    SDelData delData;
    n += tGetDelData(pData + n, &delData);
H
Hongze Cheng 已提交
1254

H
Hongze Cheng 已提交
1255
    if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
H
Hongze Cheng 已提交
1256 1257 1258 1259 1260
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }
  ASSERT(n == size);
H
Hongze Cheng 已提交
1261

H
Hongze Cheng 已提交
1262 1263 1264
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1265
  }
H
Hongze Cheng 已提交
1266 1267
  return code;
}
H
Hongze Cheng 已提交
1268

H
Hongze Cheng 已提交
1269 1270 1271
static int32_t tsdbSnapWriteDelDataStart(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1272

H
Hongze Cheng 已提交
1273 1274
  STsdb*    pTsdb = pWriter->pTsdb;
  SDelFile* pDelFile = pWriter->fs.pDelFile;
H
Hongze Cheng 已提交
1275

H
Hongze Cheng 已提交
1276
  pWriter->tbid = (TABLEID){0};
H
Hongze Cheng 已提交
1277

H
Hongze Cheng 已提交
1278 1279 1280 1281
  // reader
  if (pDelFile) {
    code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1282

H
Hongze Cheng 已提交
1283 1284
    code = tsdbOpenTombFileDataIter(pWriter->pDelFReader, &pWriter->pTIter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1285
  }
H
Hongze Cheng 已提交
1286

H
Hongze Cheng 已提交
1287 1288 1289
  // writer
  code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &(SDelFile){.commitID = pWriter->commitID}, pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1290

H
Hongze Cheng 已提交
1291
  if ((pWriter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
1292
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1293 1294 1295 1296 1297
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  if ((pWriter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1298 1299
  }

H
Hongze Cheng 已提交
1300 1301 1302 1303 1304 1305
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1306 1307 1308
  return code;
}

H
Hongze Cheng 已提交
1309
static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) {
H
Hongze Cheng 已提交
1310
  int32_t code = 0;
H
Hongze Cheng 已提交
1311
  int32_t lino = 0;
H
Hongze Cheng 已提交
1312

H
Hongze Cheng 已提交
1313
  STsdb* pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
1314

H
Hongze Cheng 已提交
1315
  // end remaining table with NULL data
H
Hongze Cheng 已提交
1316 1317
  code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1318

H
Hongze Cheng 已提交
1319 1320 1321
  // update file-level info
  code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdx);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1322

H
Hongze Cheng 已提交
1323
  code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter);
H
Hongze Cheng 已提交
1324
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1325

H
Hongze Cheng 已提交
1326
  code = tsdbFSUpsertDelFile(&pWriter->fs, &pWriter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
1327
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1328 1329

  code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1);
H
Hongze Cheng 已提交
1330
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1331 1332 1333

  if (pWriter->pDelFReader) {
    code = tsdbDelFReaderClose(&pWriter->pDelFReader);
H
Hongze Cheng 已提交
1334
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1335 1336
  }

H
Hongze Cheng 已提交
1337 1338 1339
  if (pWriter->pTIter) {
    tsdbCloseDataIter2(pWriter->pTIter);
    pWriter->pTIter = NULL;
H
Hongze Cheng 已提交
1340 1341
  }

H
Hongze Cheng 已提交
1342 1343 1344 1345 1346 1347
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1348
  return code;
H
Hongze Cheng 已提交
1349
}
H
Hongze Cheng 已提交
1350

H
Hongze Cheng 已提交
1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362
static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdb* pTsdb = pWriter->pTsdb;

  // start to write del data if need
  if (pWriter->pDelFWriter == NULL) {
    code = tsdbSnapWriteDelDataStart(pWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
1363
  // do write del data
H
Hongze Cheng 已提交
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373
  code = tsdbSnapWriteDelTableData(pWriter, (TABLEID*)pHdr->data, pHdr->data + sizeof(TABLEID),
                                   pHdr->size - sizeof(TABLEID));
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed since %s", TD_VID(pTsdb->pVnode), __func__, tstrerror(code));
  } else {
    tsdbTrace("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1374 1375 1376
  return code;
}

H
Hongze Cheng 已提交
1377
// APIs
H
Hongze Cheng 已提交
1378
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
H
Hongze Cheng 已提交
1379 1380
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1381 1382

  // alloc
H
Hongze Cheng 已提交
1383
  STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
H
Hongze Cheng 已提交
1384 1385
  if (pWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1386
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1387 1388 1389 1390
  }
  pWriter->pTsdb = pTsdb;
  pWriter->sver = sver;
  pWriter->ever = ever;
H
Hongze Cheng 已提交
1391 1392 1393 1394 1395
  pWriter->minutes = pTsdb->keepCfg.days;
  pWriter->precision = pTsdb->keepCfg.precision;
  pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
1396 1397
  pWriter->commitID = pTsdb->pVnode->state.commitID;

H
Hongze Cheng 已提交
1398 1399 1400
  code = tsdbFSCopy(pTsdb, &pWriter->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
1401
  // SNAP_DATA_TSDB
H
Hongze Cheng 已提交
1402
  code = tBlockDataCreate(&pWriter->inData);
H
Hongze Cheng 已提交
1403
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1404

H
Hongze Cheng 已提交
1405
  pWriter->fid = INT32_MIN;
H
Hongze Cheng 已提交
1406

H
Hongze Cheng 已提交
1407
  code = tBlockDataCreate(&pWriter->bData);
H
Hongze Cheng 已提交
1408
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1409 1410

  code = tBlockDataCreate(&pWriter->sData);
H
Hongze Cheng 已提交
1411
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1412

H
Hongze Cheng 已提交
1413
  // SNAP_DATA_DEL
H
Hongze Cheng 已提交
1414

H
Hongze Cheng 已提交
1415 1416
_exit:
  if (code) {
H
Hongze Cheng 已提交
1417
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1418
    if (pWriter) {
1419
      tBlockDataDestroy(&pWriter->sData);
H
Hongze Cheng 已提交
1420
      tBlockDataDestroy(&pWriter->bData);
1421
      tBlockDataDestroy(&pWriter->inData);
H
Hongze Cheng 已提交
1422
      tsdbFSDestroy(&pWriter->fs);
H
Hongze Cheng 已提交
1423
      taosMemoryFree(pWriter);
H
Hongze Cheng 已提交
1424
      pWriter = NULL;
H
Hongze Cheng 已提交
1425 1426
    }
  } else {
H
Hongze Cheng 已提交
1427
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
H
Hongze Cheng 已提交
1428
  }
H
Hongze Cheng 已提交
1429
  *ppWriter = pWriter;
H
Hongze Cheng 已提交
1430 1431 1432
  return code;
}

H
Hongze Cheng 已提交
1433 1434
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1435 1436
  int32_t lino = 0;

H
Hongze Cheng 已提交
1437
  if (pWriter->pDataFWriter) {
H
Hongze Cheng 已提交
1438
    code = tsdbSnapWriteFileDataEnd(pWriter);
H
Hongze Cheng 已提交
1439
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1440 1441
  }

H
Hongze Cheng 已提交
1442 1443 1444 1445
  if (pWriter->pDelFWriter) {
    code = tsdbSnapWriteDelDataEnd(pWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
1446 1447

  code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
H
Hongze Cheng 已提交
1448
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1449 1450 1451

_exit:
  if (code) {
H
Hongze Cheng 已提交
1452 1453 1454
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
1455 1456 1457 1458
  }
  return code;
}

H
Hongze Cheng 已提交
1459
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
H
Hongze Cheng 已提交
1460 1461 1462
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1463
  STsdbSnapWriter* pWriter = *ppWriter;
H
Hongze Cheng 已提交
1464
  STsdb*           pTsdb = pWriter->pTsdb;
H
Hongze Cheng 已提交
1465 1466

  if (rollback) {
H
Hongze Cheng 已提交
1467
    tsdbRollbackCommit(pWriter->pTsdb);
H
Hongze Cheng 已提交
1468
  } else {
H
Hongze Cheng 已提交
1469 1470 1471
    // lock
    taosThreadRwlockWrlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
1472
    code = tsdbFSCommit(pWriter->pTsdb);
H
Hongze Cheng 已提交
1473 1474
    if (code) {
      taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1475
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1476 1477 1478 1479
    }

    // unlock
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1480 1481
  }

H
Hongze Cheng 已提交
1482 1483
  // SNAP_DATA_DEL
  taosArrayDestroy(pWriter->aDelData);
H
Hongze Cheng 已提交
1484
  taosArrayDestroy(pWriter->aDelIdx);
H
Hongze Cheng 已提交
1485 1486

  // SNAP_DATA_TSDB
1487
  tBlockDataDestroy(&pWriter->sData);
H
Hongze Cheng 已提交
1488
  tBlockDataDestroy(&pWriter->bData);
H
Hongze Cheng 已提交
1489 1490 1491
  taosArrayDestroy(pWriter->aSttBlk);
  tMapDataClear(&pWriter->mDataBlk);
  taosArrayDestroy(pWriter->aBlockIdx);
1492
  tDestroyTSchema(pWriter->skmTable.pTSchema);
1493
  tBlockDataDestroy(&pWriter->inData);
H
Hongze Cheng 已提交
1494

H
Hongze Cheng 已提交
1495 1496 1497
  for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
    tFree(pWriter->aBuf[iBuf]);
  }
H
Hongze Cheng 已提交
1498
  tsdbFSDestroy(&pWriter->fs);
H
Hongze Cheng 已提交
1499 1500 1501
  taosMemoryFree(pWriter);
  *ppWriter = NULL;

H
Hongze Cheng 已提交
1502 1503 1504 1505 1506 1507
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
H
Hongze Cheng 已提交
1508 1509 1510
  return code;
}

H
Hongze Cheng 已提交
1511 1512 1513
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1514

C
Cary Xu 已提交
1515
  if (pHdr->type == SNAP_DATA_TSDB) {
H
Hongze Cheng 已提交
1516 1517
    code = tsdbSnapWriteTimeSeriesData(pWriter, pHdr);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1518
    goto _exit;
H
Hongze Cheng 已提交
1519
  } else if (pWriter->pDataFWriter) {
H
Hongze Cheng 已提交
1520
    code = tsdbSnapWriteFileDataEnd(pWriter);
H
Hongze Cheng 已提交
1521
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1522 1523
  }

C
Cary Xu 已提交
1524
  if (pHdr->type == SNAP_DATA_DEL) {
H
Hongze Cheng 已提交
1525 1526 1527
    code = tsdbSnapWriteDelData(pWriter, pHdr);
    TSDB_CHECK_CODE(code, lino, _exit);
    goto _exit;
H
Hongze Cheng 已提交
1528 1529
  }

H
Hongze Cheng 已提交
1530
_exit:
H
Hongze Cheng 已提交
1531 1532 1533 1534 1535 1536 1537
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
              TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code), pHdr->type, pHdr->index, pHdr->size);
  } else {
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
              pHdr->type, pHdr->index, pHdr->size);
  }
H
Hongze Cheng 已提交
1538 1539
  return code;
}