tsdbReaderWriter.c 44.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// =============== PAGE-WISE FILE ===============
H
Hongze Cheng 已提交
19
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
H
Hongze Cheng 已提交
20
  int32_t  code = 0;
H
Hongze Cheng 已提交
21
  STsdbFD *pFD = NULL;
H
refact  
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23 24
  *ppFD = NULL;

H
Hongze Cheng 已提交
25
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
H
Hongze Cheng 已提交
26 27 28 29
  if (pFD == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31 32 33 34 35
  pFD->path = (char *)&pFD[1];
  strcpy(pFD->path, path);
  pFD->szPage = szPage;
  pFD->flag = flag;
  pFD->pFD = taosOpenFile(path, flag);
H
Hongze Cheng 已提交
36
  if (pFD->pFD == NULL) {
H
Hongze Cheng 已提交
37
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
38
    taosMemoryFree(pFD);
H
Hongze Cheng 已提交
39
    goto _exit;
H
Hongze Cheng 已提交
40
  }
H
Hongze Cheng 已提交
41
  pFD->szPage = szPage;
H
Hongze Cheng 已提交
42
  pFD->pgno = 0;
H
Hongze Cheng 已提交
43
  pFD->pBuf = taosMemoryCalloc(1, szPage);
H
Hongze Cheng 已提交
44
  if (pFD->pBuf == NULL) {
H
Hongze Cheng 已提交
45
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
46
    taosCloseFile(&pFD->pFD);
H
Hongze Cheng 已提交
47
    taosMemoryFree(pFD);
H
Hongze Cheng 已提交
48
    goto _exit;
H
Hongze Cheng 已提交
49
  }
H
Hongze Cheng 已提交
50 51
  if (taosStatFile(path, &pFD->szFile, NULL) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
52 53 54
    taosMemoryFree(pFD->pBuf);
    taosCloseFile(&pFD->pFD);
    taosMemoryFree(pFD);
H
Hongze Cheng 已提交
55 56 57 58
    goto _exit;
  }
  ASSERT(pFD->szFile % szPage == 0);
  pFD->szFile = pFD->szFile / szPage;
H
Hongze Cheng 已提交
59
  *ppFD = pFD;
H
Hongze Cheng 已提交
60 61 62

_exit:
  return code;
H
Hongze Cheng 已提交
63
}
H
Hongze Cheng 已提交
64

H
Hongze Cheng 已提交
65 66
static void tsdbCloseFile(STsdbFD **ppFD) {
  STsdbFD *pFD = *ppFD;
H
Hongze Cheng 已提交
67 68 69 70 71 72
  if (pFD) {
    taosMemoryFree(pFD->pBuf);
    taosCloseFile(&pFD->pFD);
    taosMemoryFree(pFD);
    *ppFD = NULL;
  }
H
Hongze Cheng 已提交
73 74
}

H
Hongze Cheng 已提交
75
static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
H
Hongze Cheng 已提交
76
  int32_t code = 0;
H
Hongze Cheng 已提交
77

H
Hongze Cheng 已提交
78 79 80 81 82 83
  if (pFD->pgno > 0) {
    int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET);
    if (n < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _exit;
    }
H
Hongze Cheng 已提交
84

H
Hongze Cheng 已提交
85
    taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
H
Hongze Cheng 已提交
86

H
Hongze Cheng 已提交
87 88 89 90 91
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
    if (n < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _exit;
    }
H
Hongze Cheng 已提交
92

H
Hongze Cheng 已提交
93
    if (pFD->szFile < pFD->pgno) {
H
Hongze Cheng 已提交
94
      pFD->szFile = pFD->pgno;
H
Hongze Cheng 已提交
95
    }
H
Hongze Cheng 已提交
96
  }
H
Hongze Cheng 已提交
97
  pFD->pgno = 0;
H
Hongze Cheng 已提交
98 99

_exit:
H
Hongze Cheng 已提交
100 101 102
  return code;
}

H
Hongze Cheng 已提交
103
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
H
Hongze Cheng 已提交
104
  int32_t code = 0;
H
more  
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106 107
  ASSERT(pgno <= pFD->szFile);

H
Hongze Cheng 已提交
108 109 110
  // seek
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
H
Hongze Cheng 已提交
111
  if (n < 0) {
H
Hongze Cheng 已提交
112
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
113
    goto _exit;
H
Hongze Cheng 已提交
114 115
  }

H
Hongze Cheng 已提交
116
  // read
H
Hongze Cheng 已提交
117
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
H
Hongze Cheng 已提交
118
  if (n < 0) {
H
Hongze Cheng 已提交
119
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
120 121
    goto _exit;
  } else if (n < pFD->szPage) {
H
Hongze Cheng 已提交
122
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
123
    goto _exit;
H
Hongze Cheng 已提交
124 125
  }

H
Hongze Cheng 已提交
126
  // check
127
  if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
H
Hongze Cheng 已提交
128
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
129
    goto _exit;
H
Hongze Cheng 已提交
130 131
  }

H
Hongze Cheng 已提交
132
  pFD->pgno = pgno;
H
Hongze Cheng 已提交
133

H
Hongze Cheng 已提交
134 135 136
_exit:
  return code;
}
H
Hongze Cheng 已提交
137

H
Hongze Cheng 已提交
138
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) {
H
Hongze Cheng 已提交
139 140 141 142 143 144 145 146 147 148 149
  int32_t code = 0;
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
  int64_t bOffset = fOffset % pFD->szPage;
  int64_t n = 0;

  do {
    if (pFD->pgno != pgno) {
      code = tsdbWriteFilePage(pFD);
      if (code) goto _exit;

H
Hongze Cheng 已提交
150
      if (pgno <= pFD->szFile) {
H
Hongze Cheng 已提交
151 152 153 154 155 156 157
        code = tsdbReadFilePage(pFD, pgno);
        if (code) goto _exit;
      } else {
        pFD->pgno = pgno;
      }
    }

H
Hongze Cheng 已提交
158 159
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
H
Hongze Cheng 已提交
160 161 162

    pgno++;
    bOffset = 0;
H
Hongze Cheng 已提交
163
    n += nWrite;
H
Hongze Cheng 已提交
164 165 166 167 168 169
  } while (n < size);

_exit:
  return code;
}

H
Hongze Cheng 已提交
170
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
H
Hongze Cheng 已提交
171
  int32_t code = 0;
H
Hongze Cheng 已提交
172
  int64_t n = 0;
H
Hongze Cheng 已提交
173 174
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
H
Hongze Cheng 已提交
175
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
H
fix bug  
Hongze Cheng 已提交
176
  int64_t bOffset = fOffset % pFD->szPage;
H
Hongze Cheng 已提交
177

H
Hongze Cheng 已提交
178
  ASSERT(pgno && pgno <= pFD->szFile);
H
fix bug  
Hongze Cheng 已提交
179
  ASSERT(bOffset < szPgCont);
H
Hongze Cheng 已提交
180

H
Hongze Cheng 已提交
181
  while (n < size) {
H
fix bug  
Hongze Cheng 已提交
182 183 184 185
    if (pFD->pgno != pgno) {
      code = tsdbReadFilePage(pFD, pgno);
      if (code) goto _exit;
    }
H
Hongze Cheng 已提交
186

H
fix bug  
Hongze Cheng 已提交
187 188
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
H
Hongze Cheng 已提交
189

H
Hongze Cheng 已提交
190
    n += nRead;
H
Hongze Cheng 已提交
191
    pgno++;
H
fix bug  
Hongze Cheng 已提交
192
    bOffset = 0;
H
Hongze Cheng 已提交
193 194 195
  }

_exit:
H
Hongze Cheng 已提交
196
  return code;
H
Hongze Cheng 已提交
197 198
}

H
Hongze Cheng 已提交
199
static int32_t tsdbFsyncFile(STsdbFD *pFD) {
H
Hongze Cheng 已提交
200
  int32_t code = 0;
H
Hongze Cheng 已提交
201 202 203 204 205 206 207 208 209 210

  code = tsdbWriteFilePage(pFD);
  if (code) goto _exit;

  if (taosFsyncFile(pFD->pFD) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _exit;
  }

_exit:
H
Hongze Cheng 已提交
211 212 213
  return code;
}

H
Hongze Cheng 已提交
214 215
// SDataFWriter ====================================================
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
H
Hongze Cheng 已提交
216
  int32_t       code = 0;
H
Hongze Cheng 已提交
217 218
  int32_t       flag;
  int64_t       n;
H
Hongze Cheng 已提交
219
  int32_t       szPage = pTsdb->pVnode->config.tsdbPageSize;
H
Hongze Cheng 已提交
220
  SDataFWriter *pWriter = NULL;
H
Hongze Cheng 已提交
221
  char          fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
222
  char          hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
223 224

  // alloc
H
Hongze Cheng 已提交
225 226
  pWriter = taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
H
Hongze Cheng 已提交
227 228 229
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
230
  pWriter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
231 232 233 234 235
  pWriter->wSet = (SDFileSet){.diskId = pSet->diskId,
                              .fid = pSet->fid,
                              .pHeadF = &pWriter->fHead,
                              .pDataF = &pWriter->fData,
                              .pSmaF = &pWriter->fSma,
H
Hongze Cheng 已提交
236
                              .nSttF = pSet->nSttF};
H
Hongze Cheng 已提交
237 238 239
  pWriter->fHead = *pSet->pHeadF;
  pWriter->fData = *pSet->pDataF;
  pWriter->fSma = *pSet->pSmaF;
H
Hongze Cheng 已提交
240 241 242
  for (int8_t iStt = 0; iStt < pSet->nSttF; iStt++) {
    pWriter->wSet.aSttF[iStt] = &pWriter->fStt[iStt];
    pWriter->fStt[iStt] = *pSet->aSttF[iStt];
H
Hongze Cheng 已提交
243
  }
H
Hongze Cheng 已提交
244 245

  // head
H
Hongze Cheng 已提交
246
  flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
247
  tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
H
Hongze Cheng 已提交
248 249
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
250

H
Hongze Cheng 已提交
251
  code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
252
  if (code) goto _err;
H
Hongze Cheng 已提交
253 254
  pWriter->fHead.size += TSDB_FHDR_SIZE;

H
Hongze Cheng 已提交
255
  // data
H
Hongze Cheng 已提交
256
  if (pWriter->fData.size == 0) {
H
Hongze Cheng 已提交
257
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
258
  } else {
H
Hongze Cheng 已提交
259
    flag = TD_FILE_READ | TD_FILE_WRITE;
H
Hongze Cheng 已提交
260 261
  }
  tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
H
Hongze Cheng 已提交
262 263
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
264
  if (pWriter->fData.size == 0) {
H
Hongze Cheng 已提交
265
    code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
266
    if (code) goto _err;
H
Hongze Cheng 已提交
267 268
    pWriter->fData.size += TSDB_FHDR_SIZE;
  }
H
Hongze Cheng 已提交
269 270

  // sma
H
Hongze Cheng 已提交
271
  if (pWriter->fSma.size == 0) {
H
Hongze Cheng 已提交
272
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
273
  } else {
H
Hongze Cheng 已提交
274
    flag = TD_FILE_READ | TD_FILE_WRITE;
H
Hongze Cheng 已提交
275 276
  }
  tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
H
Hongze Cheng 已提交
277 278
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
279
  if (pWriter->fSma.size == 0) {
H
Hongze Cheng 已提交
280
    code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
281
    if (code) goto _err;
H
Hongze Cheng 已提交
282

H
Hongze Cheng 已提交
283
    pWriter->fSma.size += TSDB_FHDR_SIZE;
H
Hongze Cheng 已提交
284 285
  }

H
Hongze Cheng 已提交
286 287
  // stt
  ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0);
H
Hongze Cheng 已提交
288
  flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
289 290
  tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname);
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD);
H
Hongze Cheng 已提交
291
  if (code) goto _err;
H
Hongze Cheng 已提交
292
  code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
293
  if (code) goto _err;
H
Hongze Cheng 已提交
294
  pWriter->fStt[pWriter->wSet.nSttF - 1].size += TSDB_FHDR_SIZE;
H
Hongze Cheng 已提交
295 296

  *ppWriter = pWriter;
H
Hongze Cheng 已提交
297 298 299
  return code;

_err:
H
Hongze Cheng 已提交
300 301
  tsdbError("vgId:%d, tsdb data file writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppWriter = NULL;
H
Hongze Cheng 已提交
302 303 304
  return code;
}

H
Hongze Cheng 已提交
305
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
H
Hongze Cheng 已提交
306
  int32_t code = 0;
H
Hongze Cheng 已提交
307
  STsdb  *pTsdb = NULL;
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309 310 311 312
  if (*ppWriter == NULL) goto _exit;

  pTsdb = (*ppWriter)->pTsdb;
  if (sync) {
H
Hongze Cheng 已提交
313 314
    code = tsdbFsyncFile((*ppWriter)->pHeadFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
315

H
Hongze Cheng 已提交
316 317
    code = tsdbFsyncFile((*ppWriter)->pDataFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
318

H
Hongze Cheng 已提交
319 320
    code = tsdbFsyncFile((*ppWriter)->pSmaFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
321

H
Hongze Cheng 已提交
322
    code = tsdbFsyncFile((*ppWriter)->pSttFD);
H
Hongze Cheng 已提交
323
    if (code) goto _err;
H
Hongze Cheng 已提交
324 325
  }

H
Hongze Cheng 已提交
326 327 328
  tsdbCloseFile(&(*ppWriter)->pHeadFD);
  tsdbCloseFile(&(*ppWriter)->pDataFD);
  tsdbCloseFile(&(*ppWriter)->pSmaFD);
H
Hongze Cheng 已提交
329
  tsdbCloseFile(&(*ppWriter)->pSttFD);
H
Hongze Cheng 已提交
330

H
Hongze Cheng 已提交
331 332
  for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree((*ppWriter)->aBuf[iBuf]);
H
Hongze Cheng 已提交
333
  }
H
Hongze Cheng 已提交
334
  taosMemoryFree(*ppWriter);
H
Hongze Cheng 已提交
335
_exit:
H
Hongze Cheng 已提交
336
  *ppWriter = NULL;
H
Hongze Cheng 已提交
337 338 339
  return code;

_err:
H
Hongze Cheng 已提交
340
  tsdbError("vgId:%d, data file writer close failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
341 342 343
  return code;
}

H
Hongze Cheng 已提交
344 345 346 347
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
  int32_t code = 0;
  int64_t n;
  char    hdr[TSDB_FHDR_SIZE];
H
Hongze Cheng 已提交
348

H
Hongze Cheng 已提交
349 350 351
  // head ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutHeadFile(hdr, &pWriter->fHead);
H
Hongze Cheng 已提交
352
  code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
353
  if (code) goto _err;
H
Hongze Cheng 已提交
354 355 356 357

  // data ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutDataFile(hdr, &pWriter->fData);
H
Hongze Cheng 已提交
358
  code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
359
  if (code) goto _err;
H
Hongze Cheng 已提交
360

H
Hongze Cheng 已提交
361 362 363
  // sma ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutSmaFile(hdr, &pWriter->fSma);
H
Hongze Cheng 已提交
364
  code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
365
  if (code) goto _err;
H
Hongze Cheng 已提交
366

H
Hongze Cheng 已提交
367
  // stt ==============
H
Hongze Cheng 已提交
368
  memset(hdr, 0, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
369 370
  tPutSttFile(hdr, &pWriter->fStt[pWriter->wSet.nSttF - 1]);
  code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
371
  if (code) goto _err;
H
Hongze Cheng 已提交
372 373 374 375

  return code;

_err:
H
Hongze Cheng 已提交
376
  tsdbError("vgId:%d, update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
377 378 379
  return code;
}

H
Hongze Cheng 已提交
380 381 382
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
  int32_t    code = 0;
  SHeadFile *pHeadFile = &pWriter->fHead;
H
Hongze Cheng 已提交
383
  int64_t    size;
H
Hongze Cheng 已提交
384
  int64_t    n;
H
Hongze Cheng 已提交
385

H
Hongze Cheng 已提交
386 387 388
  // check
  if (taosArrayGetSize(aBlockIdx) == 0) {
    pHeadFile->offset = pHeadFile->size;
H
Hongze Cheng 已提交
389 390
    goto _exit;
  }
H
Hongze Cheng 已提交
391

H
Hongze Cheng 已提交
392
  // prepare
H
Hongze Cheng 已提交
393
  size = 0;
H
Hongze Cheng 已提交
394 395 396 397
  for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
    size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx));
  }

H
Hongze Cheng 已提交
398
  // alloc
H
Hongze Cheng 已提交
399
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
400 401
  if (code) goto _err;

H
Hongze Cheng 已提交
402 403 404 405
  // build
  n = 0;
  for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
    n += tPutBlockIdx(pWriter->aBuf[0] + n, taosArrayGet(aBlockIdx, iBlockIdx));
H
Hongze Cheng 已提交
406
  }
H
Hongze Cheng 已提交
407
  ASSERT(n == size);
H
Hongze Cheng 已提交
408 409

  // write
H
Hongze Cheng 已提交
410
  code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
411
  if (code) goto _err;
H
Hongze Cheng 已提交
412

H
Hongze Cheng 已提交
413 414 415
  // update
  pHeadFile->offset = pHeadFile->size;
  pHeadFile->size += size;
H
Hongze Cheng 已提交
416

H
Hongze Cheng 已提交
417
_exit:
S
Shengliang Guan 已提交
418
  // tsdbTrace("vgId:%d, write block idx, offset:%" PRId64 " size:%" PRId64 " nBlockIdx:%d",
H
Hongze Cheng 已提交
419 420
  // TD_VID(pWriter->pTsdb->pVnode),
  //           pHeadFile->offset, size, taosArrayGetSize(aBlockIdx));
H
Hongze Cheng 已提交
421 422 423
  return code;

_err:
H
Hongze Cheng 已提交
424
  tsdbError("vgId:%d, write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
425 426 427
  return code;
}

H
Hongze Cheng 已提交
428
int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
429 430 431 432 433
  int32_t    code = 0;
  SHeadFile *pHeadFile = &pWriter->fHead;
  int64_t    size;
  int64_t    n;

H
Hongze Cheng 已提交
434
  ASSERT(mDataBlk->nItem > 0);
H
Hongze Cheng 已提交
435

H
Hongze Cheng 已提交
436
  // alloc
H
Hongze Cheng 已提交
437
  size = tPutMapData(NULL, mDataBlk);
H
Hongze Cheng 已提交
438
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
439 440
  if (code) goto _err;

H
Hongze Cheng 已提交
441
  // build
H
Hongze Cheng 已提交
442
  n = tPutMapData(pWriter->aBuf[0], mDataBlk);
H
Hongze Cheng 已提交
443 444

  // write
H
Hongze Cheng 已提交
445
  code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
446
  if (code) goto _err;
H
Hongze Cheng 已提交
447

H
Hongze Cheng 已提交
448 449 450 451
  // update
  pBlockIdx->offset = pHeadFile->size;
  pBlockIdx->size = size;
  pHeadFile->size += size;
H
Hongze Cheng 已提交
452

H
Hongze Cheng 已提交
453
  tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%" PRId64 " suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64
H
Hongze Cheng 已提交
454 455
            " size:%" PRId64 " nItem:%d",
            TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid,
H
Hongze Cheng 已提交
456
            pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem);
H
Hongze Cheng 已提交
457 458 459
  return code;

_err:
H
Hongze Cheng 已提交
460
  tsdbError("vgId:%d, write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
461 462 463
  return code;
}

H
Hongze Cheng 已提交
464
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk) {
H
Hongze Cheng 已提交
465
  int32_t   code = 0;
H
Hongze Cheng 已提交
466
  SSttFile *pSttFile = &pWriter->fStt[pWriter->wSet.nSttF - 1];
H
Hongze Cheng 已提交
467
  int64_t   size = 0;
H
Hongze Cheng 已提交
468
  int64_t   n;
H
Hongze Cheng 已提交
469

H
Hongze Cheng 已提交
470
  // check
H
Hongze Cheng 已提交
471 472
  if (taosArrayGetSize(aSttBlk) == 0) {
    pSttFile->offset = pSttFile->size;
H
Hongze Cheng 已提交
473 474
    goto _exit;
  }
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476
  // size
H
Hongze Cheng 已提交
477
  size = 0;
H
Hongze Cheng 已提交
478 479
  for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSttBlk); iBlockL++) {
    size += tPutSttBlk(NULL, taosArrayGet(aSttBlk, iBlockL));
H
Hongze Cheng 已提交
480
  }
H
Hongze Cheng 已提交
481 482

  // alloc
H
Hongze Cheng 已提交
483
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
484 485
  if (code) goto _err;

H
Hongze Cheng 已提交
486 487
  // encode
  n = 0;
H
Hongze Cheng 已提交
488 489
  for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSttBlk); iBlockL++) {
    n += tPutSttBlk(pWriter->aBuf[0] + n, taosArrayGet(aSttBlk, iBlockL));
H
Hongze Cheng 已提交
490
  }
H
Hongze Cheng 已提交
491 492

  // write
H
Hongze Cheng 已提交
493
  code = tsdbWriteFile(pWriter->pSttFD, pSttFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
494
  if (code) goto _err;
H
Hongze Cheng 已提交
495

H
Hongze Cheng 已提交
496
  // update
H
Hongze Cheng 已提交
497 498
  pSttFile->offset = pSttFile->size;
  pSttFile->size += size;
H
Hongze Cheng 已提交
499

H
Hongze Cheng 已提交
500
_exit:
S
Shengliang Guan 已提交
501
  tsdbTrace("vgId:%d, tsdb write stt block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
H
Hongze Cheng 已提交
502
            pSttFile->offset, size);
H
Hongze Cheng 已提交
503 504 505
  return code;

_err:
S
Shengliang Guan 已提交
506
  tsdbError("vgId:%d, tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
507 508 509
  return code;
}

H
Hongze Cheng 已提交
510
static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) {
H
Hongze Cheng 已提交
511 512
  int32_t code = 0;

H
Hongze Cheng 已提交
513 514
  pSmaInfo->offset = 0;
  pSmaInfo->size = 0;
H
Hongze Cheng 已提交
515

H
Hongze Cheng 已提交
516 517 518
  // encode
  for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
H
Hongze Cheng 已提交
519

H
Hongze Cheng 已提交
520
    if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue;
H
Hongze Cheng 已提交
521

H
Hongze Cheng 已提交
522 523
    SColumnDataAgg sma = {.colId = pColData->cid};
    tColDataCalcSMA[pColData->type](pColData, &sma.sum, &sma.max, &sma.min, &sma.numOfNull);
H
Hongze Cheng 已提交
524

H
Hongze Cheng 已提交
525 526 527 528
    code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &sma));
    if (code) goto _err;
    pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &sma);
  }
H
Hongze Cheng 已提交
529

H
Hongze Cheng 已提交
530 531
  // write
  if (pSmaInfo->size) {
H
Hongze Cheng 已提交
532
    code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
533
    if (code) goto _err;
H
Hongze Cheng 已提交
534

H
Hongze Cheng 已提交
535
    pSmaInfo->offset = pWriter->fSma.size;
H
Hongze Cheng 已提交
536
    pWriter->fSma.size += pSmaInfo->size;
H
Hongze Cheng 已提交
537 538 539 540 541
  }

  return code;

_err:
S
Shengliang Guan 已提交
542
  tsdbError("vgId:%d, tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
543 544 545
  return code;
}

H
Hongze Cheng 已提交
546 547
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
                           int8_t cmprAlg, int8_t toLast) {
H
Hongze Cheng 已提交
548 549
  int32_t code = 0;

H
Hongze Cheng 已提交
550
  ASSERT(pBlockData->nRow > 0);
H
Hongze Cheng 已提交
551

H
Hongze Cheng 已提交
552
  if (toLast) {
H
Hongze Cheng 已提交
553
    pBlkInfo->offset = pWriter->fStt[pWriter->wSet.nSttF - 1].size;
H
Hongze Cheng 已提交
554 555 556
  } else {
    pBlkInfo->offset = pWriter->fData.size;
  }
H
Hongze Cheng 已提交
557 558
  pBlkInfo->szBlock = 0;
  pBlkInfo->szKey = 0;
H
Hongze Cheng 已提交
559

H
Hongze Cheng 已提交
560 561 562
  int32_t aBufN[4] = {0};
  code = tCmprBlockData(pBlockData, cmprAlg, NULL, NULL, pWriter->aBuf, aBufN);
  if (code) goto _err;
H
Hongze Cheng 已提交
563

H
Hongze Cheng 已提交
564
  // write =================
H
Hongze Cheng 已提交
565
  STsdbFD *pFD = toLast ? pWriter->pSttFD : pWriter->pDataFD;
H
Hongze Cheng 已提交
566

H
Hongze Cheng 已提交
567 568
  pBlkInfo->szKey = aBufN[3] + aBufN[2];
  pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
H
Hongze Cheng 已提交
569

H
Hongze Cheng 已提交
570 571
  int64_t offset = pBlkInfo->offset;
  code = tsdbWriteFile(pFD, offset, pWriter->aBuf[3], aBufN[3]);
H
Hongze Cheng 已提交
572
  if (code) goto _err;
H
Hongze Cheng 已提交
573
  offset += aBufN[3];
H
Hongze Cheng 已提交
574

H
Hongze Cheng 已提交
575
  code = tsdbWriteFile(pFD, offset, pWriter->aBuf[2], aBufN[2]);
H
Hongze Cheng 已提交
576
  if (code) goto _err;
H
Hongze Cheng 已提交
577
  offset += aBufN[2];
H
Hongze Cheng 已提交
578

H
Hongze Cheng 已提交
579
  if (aBufN[1]) {
H
Hongze Cheng 已提交
580
    code = tsdbWriteFile(pFD, offset, pWriter->aBuf[1], aBufN[1]);
H
Hongze Cheng 已提交
581
    if (code) goto _err;
H
Hongze Cheng 已提交
582
    offset += aBufN[1];
H
Hongze Cheng 已提交
583
  }
H
Hongze Cheng 已提交
584

H
Hongze Cheng 已提交
585
  if (aBufN[0]) {
H
Hongze Cheng 已提交
586
    code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], aBufN[0]);
H
Hongze Cheng 已提交
587
    if (code) goto _err;
H
Hongze Cheng 已提交
588
  }
H
Hongze Cheng 已提交
589

H
Hongze Cheng 已提交
590 591
  // update info
  if (toLast) {
H
Hongze Cheng 已提交
592
    pWriter->fStt[pWriter->wSet.nSttF - 1].size += pBlkInfo->szBlock;
H
Hongze Cheng 已提交
593 594 595
  } else {
    pWriter->fData.size += pBlkInfo->szBlock;
  }
H
Hongze Cheng 已提交
596

H
Hongze Cheng 已提交
597 598 599 600 601
  // ================= SMA ====================
  if (pSmaInfo) {
    code = tsdbWriteBlockSma(pWriter, pBlockData, pSmaInfo);
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
602

H
Hongze Cheng 已提交
603
_exit:
S
Shengliang Guan 已提交
604
  tsdbTrace("vgId:%d, tsdb write block data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d, offset:%" PRId64 " size:%d",
H
Hongze Cheng 已提交
605 606
            TD_VID(pWriter->pTsdb->pVnode), pBlockData->suid, pBlockData->uid, pBlockData->nRow, pBlkInfo->offset,
            pBlkInfo->szBlock);
H
Hongze Cheng 已提交
607 608 609
  return code;

_err:
S
Shengliang Guan 已提交
610
  tsdbError("vgId:%d, tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
611 612
  return code;
}
H
Hongze Cheng 已提交
613

H
Hongze Cheng 已提交
614
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo) {
H
Hongze Cheng 已提交
615 616 617
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
618 619 620 621 622 623 624 625 626 627
  STsdbFD *pFD = NULL;
  if (pSmaInfo) {
    pFD = pWriter->pDataFD;
    pBlkInfo->offset = pWriter->fData.size;
  } else {
    pFD = pWriter->pSttFD;
    pBlkInfo->offset = pWriter->fStt[pWriter->wSet.nSttF - 1].size;
  }
  pBlkInfo->szBlock = 0;
  pBlkInfo->szKey = 0;
H
Hongze Cheng 已提交
628 629 630 631 632 633 634 635

  // hdr
  int32_t n = tPutDiskDataHdr(NULL, &pDiskData->hdr);
  code = tRealloc(&pWriter->aBuf[0], n);
  TSDB_CHECK_CODE(code, lino, _exit);

  tPutDiskDataHdr(pWriter->aBuf[0], &pDiskData->hdr);

H
Hongze Cheng 已提交
636
  code = tsdbWriteFile(pFD, pBlkInfo->offset, pWriter->aBuf[0], n);
H
Hongze Cheng 已提交
637
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
638 639
  pBlkInfo->szKey += n;
  pBlkInfo->szBlock += n;
H
Hongze Cheng 已提交
640 641

  // uid + ver + key
H
Hongze Cheng 已提交
642 643
  if (pDiskData->pUid) {
    code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pUid, pDiskData->hdr.szUid);
H
Hongze Cheng 已提交
644
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
645 646
    pBlkInfo->szKey += pDiskData->hdr.szUid;
    pBlkInfo->szBlock += pDiskData->hdr.szUid;
H
Hongze Cheng 已提交
647 648
  }

H
Hongze Cheng 已提交
649
  code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pVer, pDiskData->hdr.szVer);
H
Hongze Cheng 已提交
650
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
651 652
  pBlkInfo->szKey += pDiskData->hdr.szVer;
  pBlkInfo->szBlock += pDiskData->hdr.szVer;
H
Hongze Cheng 已提交
653

H
Hongze Cheng 已提交
654
  code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pKey, pDiskData->hdr.szKey);
H
Hongze Cheng 已提交
655
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
656 657
  pBlkInfo->szKey += pDiskData->hdr.szKey;
  pBlkInfo->szBlock += pDiskData->hdr.szKey;
H
Hongze Cheng 已提交
658

H
Hongze Cheng 已提交
659
  // aBlockCol
H
Hongze Cheng 已提交
660 661 662 663 664 665 666
  if (pDiskData->hdr.szBlkCol) {
    code = tRealloc(&pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
    TSDB_CHECK_CODE(code, lino, _exit);

    n = 0;
    for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
      SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
H
Hongze Cheng 已提交
667
      n += tPutBlockCol(pWriter->aBuf[0] + n, pDiskCol);
H
Hongze Cheng 已提交
668 669 670
    }
    ASSERT(n == pDiskData->hdr.szBlkCol);

H
Hongze Cheng 已提交
671
    code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
H
Hongze Cheng 已提交
672 673
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
674
    pBlkInfo->szBlock += pDiskData->hdr.szBlkCol;
H
Hongze Cheng 已提交
675 676
  }

H
Hongze Cheng 已提交
677
  // aDiskCol
H
Hongze Cheng 已提交
678 679 680
  for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
    SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);

H
Hongze Cheng 已提交
681 682
    if (pDiskCol->pBit) {
      code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pBit, pDiskCol->bCol.szBitmap);
H
Hongze Cheng 已提交
683
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
684 685

      pBlkInfo->szBlock += pDiskCol->bCol.szBitmap;
H
Hongze Cheng 已提交
686 687
    }

H
Hongze Cheng 已提交
688 689
    if (pDiskCol->pOff) {
      code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pOff, pDiskCol->bCol.szOffset);
H
Hongze Cheng 已提交
690
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
691 692

      pBlkInfo->szBlock += pDiskCol->bCol.szOffset;
H
Hongze Cheng 已提交
693 694
    }

H
Hongze Cheng 已提交
695 696
    if (pDiskCol->pVal) {
      code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pVal, pDiskCol->bCol.szValue);
H
Hongze Cheng 已提交
697
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
698 699

      pBlkInfo->szBlock += pDiskCol->bCol.szValue;
H
Hongze Cheng 已提交
700 701 702
    }
  }

H
Hongze Cheng 已提交
703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
  if (pSmaInfo) {
    pWriter->fData.size += pBlkInfo->szBlock;
  } else {
    pWriter->fStt[pWriter->wSet.nSttF - 1].size += pBlkInfo->szBlock;
    goto _exit;
  }

  pSmaInfo->offset = 0;
  pSmaInfo->size = 0;
  for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
    SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);

    if (IS_VAR_DATA_TYPE(pDiskCol->bCol.type)) continue;
    if (pDiskCol->bCol.flag == HAS_NULL || pDiskCol->bCol.flag == (HAS_NULL | HAS_NONE)) continue;
    if (!pDiskCol->bCol.smaOn) continue;

    code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &pDiskCol->agg));
    TSDB_CHECK_CODE(code, lino, _exit);
    pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &pDiskCol->agg);
  }

  if (pSmaInfo->size) {
    pSmaInfo->offset = pWriter->fSma.size;

    code = tsdbWriteFile(pWriter->pSmaFD, pSmaInfo->offset, pWriter->aBuf[0], pSmaInfo->size);
    TSDB_CHECK_CODE(code, lino, _exit);

    pWriter->fSma.size += pSmaInfo->size;
  }

H
Hongze Cheng 已提交
733 734
_exit:
  if (code) {
S
Shengliang Guan 已提交
735
    tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
736 737 738 739
  }
  return code;
}

H
Hongze Cheng 已提交
740 741 742 743
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
  int32_t   code = 0;
  int64_t   n;
  int64_t   size;
H
Hongze Cheng 已提交
744 745 746
  TdFilePtr pOutFD = NULL;
  TdFilePtr PInFD = NULL;
  int32_t   szPage = pTsdb->pVnode->config.szPage;
H
Hongze Cheng 已提交
747 748
  char      fNameFrom[TSDB_FILENAME_LEN];
  char      fNameTo[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
749

H
Hongze Cheng 已提交
750 751 752 753 754 755
  // head
  tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
  tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
756 757
    goto _err;
  }
H
Hongze Cheng 已提交
758 759
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
760 761 762
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
763
  n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage));
H
Hongze Cheng 已提交
764 765 766 767
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
768 769
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
770 771

  // data
H
Hongze Cheng 已提交
772 773 774 775 776 777
  tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom);
  tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo);
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
778
  }
H
Hongze Cheng 已提交
779 780
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
781 782 783
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
784
  n = taosFSendFile(pOutFD, PInFD, 0, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage));
H
Hongze Cheng 已提交
785
  if (n < 0) {
H
Hongze Cheng 已提交
786 787 788
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
789 790
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
791

H
Hongze Cheng 已提交
792 793 794 795 796
  // sma
  tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom);
  tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo);
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
H
Hongze Cheng 已提交
797 798 799
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
800 801
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
802 803 804
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
805
  n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage));
H
Hongze Cheng 已提交
806 807 808
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
809
  }
H
Hongze Cheng 已提交
810 811 812
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);

H
Hongze Cheng 已提交
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
  // stt
  for (int8_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) {
    tsdbSttFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSttF[iStt], fNameFrom);
    tsdbSttFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSttF[iStt], fNameTo);
    pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
    if (pOutFD == NULL) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
    PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
    if (PInFD == NULL) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
    n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage));
    if (n < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
    taosCloseFile(&pOutFD);
    taosCloseFile(&PInFD);
  }

H
Hongze Cheng 已提交
836 837 838
  return code;

_err:
H
Hongze Cheng 已提交
839
  tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
840 841 842
  return code;
}

H
Hongze Cheng 已提交
843 844 845
// SDataFReader ====================================================
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
  int32_t       code = 0;
H
Hongze Cheng 已提交
846 847
  int32_t       lino = 0;
  SDataFReader *pReader = NULL;
H
Hongze Cheng 已提交
848
  int32_t       szPage = pTsdb->pVnode->config.tsdbPageSize;
H
Hongze Cheng 已提交
849
  char          fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
850

H
Hongze Cheng 已提交
851 852 853 854
  // alloc
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
855
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
856
  }
H
Hongze Cheng 已提交
857 858
  pReader->pTsdb = pTsdb;
  pReader->pSet = pSet;
H
Hongze Cheng 已提交
859

H
Hongze Cheng 已提交
860 861
  // head
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
H
Hongze Cheng 已提交
862
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
H
Hongze Cheng 已提交
863
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
864

H
Hongze Cheng 已提交
865 866
  // data
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
H
Hongze Cheng 已提交
867
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
H
Hongze Cheng 已提交
868
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
869

H
Hongze Cheng 已提交
870 871
  // sma
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
H
Hongze Cheng 已提交
872
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
H
Hongze Cheng 已提交
873
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
874

H
Hongze Cheng 已提交
875 876 877 878
  // stt
  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
    tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
    code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]);
H
Hongze Cheng 已提交
879
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
880
  }
H
Hongze Cheng 已提交
881

H
Hongze Cheng 已提交
882 883 884
_exit:
  if (code) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
885
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
886 887 888 889 890 891 892 893 894 895 896

    if (pReader) {
      for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) tsdbCloseFile(&pReader->aSttFD[iStt]);
      tsdbCloseFile(&pReader->pSmaFD);
      tsdbCloseFile(&pReader->pDataFD);
      tsdbCloseFile(&pReader->pHeadFD);
      taosMemoryFree(pReader);
    }
  } else {
    *ppReader = pReader;
  }
H
Hongze Cheng 已提交
897 898 899 900 901
  return code;
}

int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
  int32_t code = 0;
H
Hongze Cheng 已提交
902
  if (*ppReader == NULL) return code;
H
Hongze Cheng 已提交
903 904

  // head
H
Hongze Cheng 已提交
905
  tsdbCloseFile(&(*ppReader)->pHeadFD);
H
Hongze Cheng 已提交
906

H
Hongze Cheng 已提交
907
  // data
H
Hongze Cheng 已提交
908
  tsdbCloseFile(&(*ppReader)->pDataFD);
H
Hongze Cheng 已提交
909

H
Hongze Cheng 已提交
910
  // sma
H
Hongze Cheng 已提交
911
  tsdbCloseFile(&(*ppReader)->pSmaFD);
H
Hongze Cheng 已提交
912

H
Hongze Cheng 已提交
913
  // stt
H
Hongze Cheng 已提交
914
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
915 916
    if ((*ppReader)->aSttFD[iStt]) {
      tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
H
Hongze Cheng 已提交
917
    }
H
Hongze Cheng 已提交
918 919 920 921
  }

  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree((*ppReader)->aBuf[iBuf]);
H
Hongze Cheng 已提交
922
  }
H
Hongze Cheng 已提交
923 924
  taosMemoryFree(*ppReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
925
  return code;
H
Hongze Cheng 已提交
926 927
}

H
Hongze Cheng 已提交
928
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
H
Hongze Cheng 已提交
929 930 931 932
  int32_t    code = 0;
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
  int64_t    offset = pHeadFile->offset;
  int64_t    size = pHeadFile->size - offset;
H
Hongze Cheng 已提交
933

H
Hongze Cheng 已提交
934
  taosArrayClear(aBlockIdx);
H
Hongze Cheng 已提交
935
  if (size == 0) return code;
H
Hongze Cheng 已提交
936 937

  // alloc
H
Hongze Cheng 已提交
938
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
939 940
  if (code) goto _err;

H
Hongze Cheng 已提交
941
  // read
H
Hongze Cheng 已提交
942 943
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
944 945

  // decode
H
Hongze Cheng 已提交
946
  int64_t n = 0;
H
Hongze Cheng 已提交
947
  while (n < size) {
H
Hongze Cheng 已提交
948 949 950 951 952 953 954
    SBlockIdx blockIdx;
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);

    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
955
  }
H
Hongze Cheng 已提交
956
  ASSERT(n == size);
H
Hongze Cheng 已提交
957 958 959 960

  return code;

_err:
H
Hongze Cheng 已提交
961
  tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
962 963 964
  return code;
}

H
Hongze Cheng 已提交
965
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
H
Hongze Cheng 已提交
966
  int32_t   code = 0;
H
Hongze Cheng 已提交
967 968 969
  SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
  int64_t   offset = pSttFile->offset;
  int64_t   size = pSttFile->size - offset;
H
Hongze Cheng 已提交
970

H
Hongze Cheng 已提交
971
  taosArrayClear(aSttBlk);
H
Hongze Cheng 已提交
972
  if (size == 0) return code;
H
Hongze Cheng 已提交
973 974

  // alloc
H
Hongze Cheng 已提交
975
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
976 977
  if (code) goto _err;

H
Hongze Cheng 已提交
978
  // read
H
Hongze Cheng 已提交
979
  code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size);
H
Hongze Cheng 已提交
980
  if (code) goto _err;
H
Hongze Cheng 已提交
981

H
Hongze Cheng 已提交
982
  // decode
H
Hongze Cheng 已提交
983
  int64_t n = 0;
H
Hongze Cheng 已提交
984
  while (n < size) {
H
Hongze Cheng 已提交
985 986
    SSttBlk sttBlk;
    n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
H
Hongze Cheng 已提交
987

H
Hongze Cheng 已提交
988
    if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
H
Hongze Cheng 已提交
989 990 991 992
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
  }
H
Hongze Cheng 已提交
993
  ASSERT(n == size);
H
Hongze Cheng 已提交
994

H
Hongze Cheng 已提交
995 996 997
  return code;

_err:
S
Shengliang Guan 已提交
998
  tsdbError("vgId:%d, read stt blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
999 1000 1001
  return code;
}

H
Hongze Cheng 已提交
1002
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
H
Hongze Cheng 已提交
1003 1004 1005
  int32_t code = 0;
  int64_t offset = pBlockIdx->offset;
  int64_t size = pBlockIdx->size;
H
Hongze Cheng 已提交
1006 1007

  // alloc
H
Hongze Cheng 已提交
1008
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
1009 1010
  if (code) goto _err;

H
Hongze Cheng 已提交
1011
  // read
H
Hongze Cheng 已提交
1012 1013
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1014

H
Hongze Cheng 已提交
1015
  // decode
H
Hongze Cheng 已提交
1016
  int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk);
H
Hongze Cheng 已提交
1017
  if (n < 0) {
H
Hongze Cheng 已提交
1018 1019 1020
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
1021
  ASSERT(n == size);
H
Hongze Cheng 已提交
1022

H
Hongze Cheng 已提交
1023
  return code;
H
Hongze Cheng 已提交
1024

H
Hongze Cheng 已提交
1025 1026 1027
_err:
  tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
1028
}
H
Hongze Cheng 已提交
1029

H
Hongze Cheng 已提交
1030 1031 1032
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) {
  int32_t   code = 0;
  SSmaInfo *pSmaInfo = &pDataBlk->smaInfo;
H
Hongze Cheng 已提交
1033

H
Hongze Cheng 已提交
1034
  ASSERT(pSmaInfo->size > 0);
H
Hongze Cheng 已提交
1035

H
Hongze Cheng 已提交
1036
  taosArrayClear(aColumnDataAgg);
H
Hongze Cheng 已提交
1037

H
Hongze Cheng 已提交
1038
  // alloc
H
Hongze Cheng 已提交
1039
  code = tRealloc(&pReader->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
1040
  if (code) goto _err;
H
Hongze Cheng 已提交
1041

H
Hongze Cheng 已提交
1042
  // read
H
Hongze Cheng 已提交
1043
  code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
1044
  if (code) goto _err;
H
Hongze Cheng 已提交
1045

H
Hongze Cheng 已提交
1046
  // decode
H
Hongze Cheng 已提交
1047
  int32_t n = 0;
H
Hongze Cheng 已提交
1048 1049 1050
  while (n < pSmaInfo->size) {
    SColumnDataAgg sma;
    n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma);
H
Hongze Cheng 已提交
1051

H
Hongze Cheng 已提交
1052 1053
    if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1054 1055
      goto _err;
    }
H
Hongze Cheng 已提交
1056
  }
H
Hongze Cheng 已提交
1057
  ASSERT(n == pSmaInfo->size);
H
Hongze Cheng 已提交
1058
  return code;
H
Hongze Cheng 已提交
1059

H
Hongze Cheng 已提交
1060
_err:
S
Shengliang Guan 已提交
1061
  tsdbError("vgId:%d, tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1062 1063
  return code;
}
H
Hongze Cheng 已提交
1064

H
Hongze Cheng 已提交
1065 1066
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData,
                                     int32_t iStt) {
H
Hongze Cheng 已提交
1067
  int32_t code = 0;
H
Hongze Cheng 已提交
1068

H
Hongze Cheng 已提交
1069 1070
  tBlockDataClear(pBlockData);

H
Hongze Cheng 已提交
1071
  STsdbFD *pFD = (iStt < 0) ? pReader->pDataFD : pReader->aSttFD[iStt];
H
Hongze Cheng 已提交
1072 1073

  // uid + version + tskey
H
Hongze Cheng 已提交
1074 1075 1076 1077 1078 1079
  code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
  if (code) goto _err;

  code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey);
  if (code) goto _err;

H
Hongze Cheng 已提交
1080 1081 1082 1083 1084 1085
  SDiskDataHdr hdr;
  uint8_t     *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr);

  ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
  ASSERT(pBlockData->suid == hdr.suid);

H
Hongze Cheng 已提交
1086
  pBlockData->uid = hdr.uid;
H
Hongze Cheng 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
  pBlockData->nRow = hdr.nRow;

  // uid
  if (hdr.uid == 0) {
    ASSERT(hdr.szUid);
    code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid,
                          sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
    if (code) goto _err;
  } else {
    ASSERT(!hdr.szUid);
  }
  p += hdr.szUid;
H
Hongze Cheng 已提交
1099

H
Hongze Cheng 已提交
1100 1101 1102 1103 1104
  // version
  code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion,
                        sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
  if (code) goto _err;
  p += hdr.szVer;
H
Hongze Cheng 已提交
1105

H
Hongze Cheng 已提交
1106 1107 1108
  // TSKEY
  code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
                        sizeof(TSKEY) * hdr.nRow, &pReader->aBuf[1]);
H
Hongze Cheng 已提交
1109
  if (code) goto _err;
H
Hongze Cheng 已提交
1110
  p += hdr.szKey;
H
Hongze Cheng 已提交
1111

H
Hongze Cheng 已提交
1112
  ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey);
H
Hongze Cheng 已提交
1113

H
Hongze Cheng 已提交
1114 1115
  // read and decode columns
  if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit;
H
Hongze Cheng 已提交
1116

H
Hongze Cheng 已提交
1117 1118
  if (hdr.szBlkCol > 0) {
    int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
H
Hongze Cheng 已提交
1119 1120 1121 1122

    code = tRealloc(&pReader->aBuf[0], hdr.szBlkCol);
    if (code) goto _err;

H
Hongze Cheng 已提交
1123 1124
    code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol);
    if (code) goto _err;
H
Hongze Cheng 已提交
1125 1126
  }

H
Hongze Cheng 已提交
1127 1128 1129
  SBlockCol  blockCol = {.cid = 0};
  SBlockCol *pBlockCol = &blockCol;
  int32_t    n = 0;
H
Hongze Cheng 已提交
1130

H
Hongze Cheng 已提交
1131 1132
  for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
H
Hongze Cheng 已提交
1133

H
Hongze Cheng 已提交
1134 1135 1136 1137 1138 1139 1140
    while (pBlockCol && pBlockCol->cid < pColData->cid) {
      if (n < hdr.szBlkCol) {
        n += tGetBlockCol(pReader->aBuf[0] + n, pBlockCol);
      } else {
        ASSERT(n == hdr.szBlkCol);
        pBlockCol = NULL;
      }
H
Hongze Cheng 已提交
1141
    }
H
Hongze Cheng 已提交
1142

H
Hongze Cheng 已提交
1143 1144 1145 1146 1147 1148 1149 1150 1151
    if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) {
      // add a lot of NONE
      for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
        code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
        if (code) goto _err;
      }
    } else {
      ASSERT(pBlockCol->type == pColData->type);
      ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
H
Hongze Cheng 已提交
1152

H
Hongze Cheng 已提交
1153 1154 1155 1156 1157 1158 1159 1160
      if (pBlockCol->flag == HAS_NULL) {
        // add a lot of NULL
        for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
          code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
          if (code) goto _err;
        }
      } else {
        // decode from binary
H
Hongze Cheng 已提交
1161 1162
        int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + pBlockCol->offset;
        int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue;
H
Hongze Cheng 已提交
1163

H
Hongze Cheng 已提交
1164 1165 1166
        code = tRealloc(&pReader->aBuf[1], size);
        if (code) goto _err;

H
Hongze Cheng 已提交
1167 1168
        code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size);
        if (code) goto _err;
H
Hongze Cheng 已提交
1169 1170 1171 1172 1173

        code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
        if (code) goto _err;
      }
    }
H
Hongze Cheng 已提交
1174
  }
H
Hongze Cheng 已提交
1175

H
Hongze Cheng 已提交
1176
_exit:
H
Hongze Cheng 已提交
1177 1178
  return code;

H
Hongze Cheng 已提交
1179
_err:
S
Shengliang Guan 已提交
1180
  tsdbError("vgId:%d, tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1181
  return code;
H
Hongze Cheng 已提交
1182
}
H
Hongze Cheng 已提交
1183

H
Hongze Cheng 已提交
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
  int32_t     code = 0;
  SBlockInfo *pBlockInfo = &pDataBlk->aSubBlock[0];

  // alloc
  code = tRealloc(&pReader->aBuf[0], pBlockInfo->szBlock);
  if (code) goto _err;

  // read
  code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock);
  if (code) goto _err;

  // decmpr
  code = tDecmprBlockData(pReader->aBuf[0], pBlockInfo->szBlock, pBlockData, &pReader->aBuf[1]);
  if (code) goto _err;

  return code;

_err:
S
Shengliang Guan 已提交
1203
  tsdbError("vgId:%d, tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1204
  return code;
H
Hongze Cheng 已提交
1205
}
H
Hongze Cheng 已提交
1206

H
Hongze Cheng 已提交
1207 1208
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1209

H
Hongze Cheng 已提交
1210
  code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData, -1);
H
Hongze Cheng 已提交
1211
  if (code) goto _err;
H
Hongze Cheng 已提交
1212

H
Hongze Cheng 已提交
1213 1214 1215
  ASSERT(pDataBlk->nSubBlock == 1);

#if 0
H
Hongze Cheng 已提交
1216 1217 1218
  if (pDataBlk->nSubBlock > 1) {
    SBlockData bData1;
    SBlockData bData2;
H
Hongze Cheng 已提交
1219

H
Hongze Cheng 已提交
1220 1221 1222 1223 1224
    // create
    code = tBlockDataCreate(&bData1);
    if (code) goto _err;
    code = tBlockDataCreate(&bData2);
    if (code) goto _err;
H
Hongze Cheng 已提交
1225

H
Hongze Cheng 已提交
1226 1227 1228
    // init
    tBlockDataInitEx(&bData1, pBlockData);
    tBlockDataInitEx(&bData2, pBlockData);
H
Hongze Cheng 已提交
1229

H
Hongze Cheng 已提交
1230
    for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
H
Hongze Cheng 已提交
1231
      code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], &bData1);
H
Hongze Cheng 已提交
1232 1233 1234 1235 1236
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
H
Hongze Cheng 已提交
1237

H
Hongze Cheng 已提交
1238 1239 1240 1241 1242 1243
      code = tBlockDataCopy(pBlockData, &bData2);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
H
Hongze Cheng 已提交
1244

H
Hongze Cheng 已提交
1245 1246 1247 1248 1249 1250 1251
      code = tBlockDataMerge(&bData1, &bData2, pBlockData);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
    }
H
Hongze Cheng 已提交
1252

H
Hongze Cheng 已提交
1253 1254
    tBlockDataDestroy(&bData1, 1);
    tBlockDataDestroy(&bData2, 1);
H
Hongze Cheng 已提交
1255
  }
H
Hongze Cheng 已提交
1256
#endif
H
Hongze Cheng 已提交
1257

H
Hongze Cheng 已提交
1258
  return code;
H
Hongze Cheng 已提交
1259

H
Hongze Cheng 已提交
1260
_err:
S
Shengliang Guan 已提交
1261
  tsdbError("vgId:%d, tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1262 1263
  return code;
}
H
Hongze Cheng 已提交
1264

H
Hongze Cheng 已提交
1265
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) {
H
Hongze Cheng 已提交
1266
  int32_t code = 0;
H
Hongze Cheng 已提交
1267
  int32_t lino = 0;
H
Hongze Cheng 已提交
1268

H
Hongze Cheng 已提交
1269
  code = tsdbReadBlockDataImpl(pReader, &pSttBlk->bInfo, pBlockData, iStt);
H
Hongze Cheng 已提交
1270 1271 1272 1273
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
S
Shengliang Guan 已提交
1274
    tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1275 1276 1277 1278 1279 1280 1281
  }
  return code;
}

int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1282

H
Hongze Cheng 已提交
1283
  // alloc
H
Hongze Cheng 已提交
1284
  code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock);
H
Hongze Cheng 已提交
1285
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1286

H
Hongze Cheng 已提交
1287
  // read
H
Hongze Cheng 已提交
1288
  code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock);
H
Hongze Cheng 已提交
1289
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1290

H
Hongze Cheng 已提交
1291
  // decmpr
H
Hongze Cheng 已提交
1292
  code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
H
Hongze Cheng 已提交
1293
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1294

H
Hongze Cheng 已提交
1295 1296
_exit:
  if (code) {
S
Shengliang Guan 已提交
1297
    tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1298
  }
H
Hongze Cheng 已提交
1299 1300 1301 1302 1303 1304
  return code;
}

// SDelFWriter ====================================================
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
1305
  int32_t      lino = 0;
H
Hongze Cheng 已提交
1306
  char         fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
1307
  uint8_t      hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
1308
  SDelFWriter *pDelFWriter = NULL;
H
Hongze Cheng 已提交
1309 1310 1311 1312 1313 1314
  int64_t      n;

  // alloc
  pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter));
  if (pDelFWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1315
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1316 1317 1318 1319 1320
  }
  pDelFWriter->pTsdb = pTsdb;
  pDelFWriter->fDel = *pFile;

  tsdbDelFileName(pTsdb, pFile, fname);
H
Hongze Cheng 已提交
1321 1322
  code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE,
                      &pDelFWriter->pWriteH);
H
Hongze Cheng 已提交
1323
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1324 1325

  // update header
H
Hongze Cheng 已提交
1326
  code = tsdbWriteFile(pDelFWriter->pWriteH, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
1327
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1328 1329 1330 1331 1332 1333

  pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
  pDelFWriter->fDel.offset = 0;

  *ppWriter = pDelFWriter;

H
Hongze Cheng 已提交
1334 1335 1336 1337
_exit:
  if (code) {
    if (pDelFWriter) {
      tsdbCloseFile(&pDelFWriter->pWriteH);
H
Hongze Cheng 已提交
1338
      taosMemoryFree(pDelFWriter);
H
Hongze Cheng 已提交
1339 1340
    }
    *ppWriter = NULL;
S
Shengliang Guan 已提交
1341
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
H
Hongze Cheng 已提交
1342 1343 1344
  } else {
    *ppWriter = pDelFWriter;
  }
H
Hongze Cheng 已提交
1345 1346 1347 1348 1349 1350 1351 1352 1353
  return code;
}

int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
  int32_t      code = 0;
  SDelFWriter *pWriter = *ppWriter;
  STsdb       *pTsdb = pWriter->pTsdb;

  // sync
H
Hongze Cheng 已提交
1354 1355 1356
  if (sync) {
    code = tsdbFsyncFile(pWriter->pWriteH);
    if (code) goto _err;
H
Hongze Cheng 已提交
1357 1358 1359
  }

  // close
H
Hongze Cheng 已提交
1360
  tsdbCloseFile(&pWriter->pWriteH);
H
Hongze Cheng 已提交
1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380

  for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree(pWriter->aBuf[iBuf]);
  }
  taosMemoryFree(pWriter);

  *ppWriter = NULL;
  return code;

_err:
  tsdbError("vgId:%d, failed to close del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) {
  int32_t code = 0;
  int64_t size;
  int64_t n;

  // prepare
H
Hongze Cheng 已提交
1381
  size = 0;
H
Hongze Cheng 已提交
1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
    size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData));
  }

  // alloc
  code = tRealloc(&pWriter->aBuf[0], size);
  if (code) goto _err;

  // build
  n = 0;
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
    n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData));
  }
H
Hongze Cheng 已提交
1395
  ASSERT(n == size);
H
Hongze Cheng 已提交
1396 1397

  // write
H
Hongze Cheng 已提交
1398 1399
  code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419

  // update
  pDelIdx->offset = pWriter->fDel.size;
  pDelIdx->size = size;
  pWriter->fDel.size += size;

  return code;

_err:
  tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
  int32_t  code = 0;
  int64_t  size;
  int64_t  n;
  SDelIdx *pDelIdx;

  // prepare
H
Hongze Cheng 已提交
1420
  size = 0;
H
Hongze Cheng 已提交
1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433
  for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
    size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx));
  }

  // alloc
  code = tRealloc(&pWriter->aBuf[0], size);
  if (code) goto _err;

  // build
  n = 0;
  for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
    n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx));
  }
H
Hongze Cheng 已提交
1434
  ASSERT(n == size);
H
Hongze Cheng 已提交
1435 1436

  // write
H
Hongze Cheng 已提交
1437 1438
  code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1439

H
Hongze Cheng 已提交
1440 1441 1442
  // update
  pWriter->fDel.offset = pWriter->fDel.size;
  pWriter->fDel.size += size;
H
Hongze Cheng 已提交
1443

H
Hongze Cheng 已提交
1444
  return code;
H
Hongze Cheng 已提交
1445

H
Hongze Cheng 已提交
1446 1447 1448 1449 1450 1451 1452
_err:
  tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1453
  char    hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
1454 1455 1456 1457 1458
  int64_t size = TSDB_FHDR_SIZE;
  int64_t n;

  // build
  tPutDelFile(hdr, &pWriter->fDel);
H
Hongze Cheng 已提交
1459

H
Hongze Cheng 已提交
1460
  // write
H
Hongze Cheng 已提交
1461 1462
  code = tsdbWriteFile(pWriter->pWriteH, 0, hdr, size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1463 1464 1465 1466

  return code;

_err:
H
Hongze Cheng 已提交
1467
  tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1468
  return code;
H
Hongze Cheng 已提交
1469
}
H
Hongze Cheng 已提交
1470 1471
// SDelFReader ====================================================
struct SDelFReader {
H
Hongze Cheng 已提交
1472 1473 1474
  STsdb   *pTsdb;
  SDelFile fDel;
  STsdbFD *pReadH;
H
Hongze Cheng 已提交
1475 1476
  uint8_t *aBuf[1];
};
H
Hongze Cheng 已提交
1477

H
Hongze Cheng 已提交
1478 1479
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
1480
  int32_t      lino = 0;
H
Hongze Cheng 已提交
1481
  char         fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
1482
  SDelFReader *pDelFReader = NULL;
H
Hongze Cheng 已提交
1483

H
Hongze Cheng 已提交
1484 1485 1486
  // alloc
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
  if (pDelFReader == NULL) {
H
Hongze Cheng 已提交
1487
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1488
    goto _exit;
H
Hongze Cheng 已提交
1489 1490 1491 1492 1493 1494 1495
  }

  // open impl
  pDelFReader->pTsdb = pTsdb;
  pDelFReader->fDel = *pFile;

  tsdbDelFileName(pTsdb, pFile, fname);
H
Hongze Cheng 已提交
1496
  code = tsdbOpenFile(fname, pTsdb->pVnode->config.tsdbPageSize, TD_FILE_READ, &pDelFReader->pReadH);
H
Hongze Cheng 已提交
1497 1498 1499 1500
  if (code) {
    taosMemoryFree(pDelFReader);
    goto _exit;
  }
H
Hongze Cheng 已提交
1501

H
Hongze Cheng 已提交
1502 1503 1504
_exit:
  if (code) {
    *ppReader = NULL;
S
Shengliang Guan 已提交
1505
    tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1506 1507 1508
  } else {
    *ppReader = pDelFReader;
  }
H
Hongze Cheng 已提交
1509
  return code;
H
Hongze Cheng 已提交
1510 1511
}

H
Hongze Cheng 已提交
1512 1513 1514
int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
  int32_t      code = 0;
  SDelFReader *pReader = *ppReader;
H
Hongze Cheng 已提交
1515

H
Hongze Cheng 已提交
1516
  if (pReader) {
H
Hongze Cheng 已提交
1517
    tsdbCloseFile(&pReader->pReadH);
H
Hongze Cheng 已提交
1518 1519 1520 1521
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
      tFree(pReader->aBuf[iBuf]);
    }
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
1522
  }
H
Hongze Cheng 已提交
1523
  *ppReader = NULL;
H
Hongze Cheng 已提交
1524 1525 1526 1527 1528

_exit:
  return code;
}

H
Hongze Cheng 已提交
1529
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
H
Hongze Cheng 已提交
1530
  int32_t code = 0;
H
Hongze Cheng 已提交
1531 1532 1533
  int64_t offset = pDelIdx->offset;
  int64_t size = pDelIdx->size;
  int64_t n;
H
Hongze Cheng 已提交
1534

H
Hongze Cheng 已提交
1535
  taosArrayClear(aDelData);
H
Hongze Cheng 已提交
1536

H
Hongze Cheng 已提交
1537 1538 1539
  // alloc
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1540

H
Hongze Cheng 已提交
1541
  // read
H
Hongze Cheng 已提交
1542 1543
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1544 1545 1546

  // // decode
  n = 0;
H
Hongze Cheng 已提交
1547
  while (n < size) {
H
Hongze Cheng 已提交
1548 1549 1550 1551 1552 1553
    SDelData delData;
    n += tGetDelData(pReader->aBuf[0] + n, &delData);

    if (taosArrayPush(aDelData, &delData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
H
Hongze Cheng 已提交
1554 1555
    }
  }
H
Hongze Cheng 已提交
1556
  ASSERT(n == size);
H
Hongze Cheng 已提交
1557 1558 1559 1560 1561

  return code;

_err:
  tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1562 1563 1564
  return code;
}

H
Hongze Cheng 已提交
1565
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
H
Hongze Cheng 已提交
1566
  int32_t code = 0;
H
Hongze Cheng 已提交
1567 1568 1569
  int32_t n;
  int64_t offset = pReader->fDel.offset;
  int64_t size = pReader->fDel.size - offset;
H
Hongze Cheng 已提交
1570

H
Hongze Cheng 已提交
1571 1572 1573 1574 1575 1576 1577
  taosArrayClear(aDelIdx);

  // alloc
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;

  // read
H
Hongze Cheng 已提交
1578 1579
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1580

H
Hongze Cheng 已提交
1581 1582
  // decode
  n = 0;
H
Hongze Cheng 已提交
1583
  while (n < size) {
H
Hongze Cheng 已提交
1584
    SDelIdx delIdx;
H
Hongze Cheng 已提交
1585

H
Hongze Cheng 已提交
1586
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
H
Hongze Cheng 已提交
1587

H
Hongze Cheng 已提交
1588 1589 1590 1591
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
1592 1593
  }

H
Hongze Cheng 已提交
1594
  ASSERT(n == size);
H
Hongze Cheng 已提交
1595

H
Hongze Cheng 已提交
1596
  return code;
H
Hongze Cheng 已提交
1597

H
Hongze Cheng 已提交
1598 1599
_err:
  tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1600
  return code;
1601
}