tsdbReaderWriter.c 37.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// =============== PAGE-WISE FILE ===============
H
Hongze Cheng 已提交
19
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
H
Hongze Cheng 已提交
20 21
  int32_t  code = 0;
  STsdbFD *pFD;
H
refact  
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23 24
  *ppFD = NULL;

H
Hongze Cheng 已提交
25
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
H
Hongze Cheng 已提交
26 27 28 29
  if (pFD == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31 32 33 34 35
  pFD->path = (char *)&pFD[1];
  strcpy(pFD->path, path);
  pFD->szPage = szPage;
  pFD->flag = flag;
  pFD->pFD = taosOpenFile(path, flag);
H
Hongze Cheng 已提交
36
  if (pFD->pFD == NULL) {
H
Hongze Cheng 已提交
37
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
38
    goto _exit;
H
Hongze Cheng 已提交
39
  }
H
Hongze Cheng 已提交
40
  pFD->szPage = szPage;
H
Hongze Cheng 已提交
41
  pFD->pgno = 0;
H
Hongze Cheng 已提交
42
  pFD->pBuf = taosMemoryCalloc(1, szPage);
H
Hongze Cheng 已提交
43
  if (pFD->pBuf == NULL) {
H
Hongze Cheng 已提交
44
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
45
    taosMemoryFree(pFD);
H
Hongze Cheng 已提交
46
    goto _exit;
H
Hongze Cheng 已提交
47
  }
H
Hongze Cheng 已提交
48 49 50 51 52 53
  if (taosStatFile(path, &pFD->szFile, NULL) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _exit;
  }
  ASSERT(pFD->szFile % szPage == 0);
  pFD->szFile = pFD->szFile / szPage;
H
Hongze Cheng 已提交
54
  *ppFD = pFD;
H
Hongze Cheng 已提交
55 56 57

_exit:
  return code;
H
Hongze Cheng 已提交
58
}
H
Hongze Cheng 已提交
59

H
Hongze Cheng 已提交
60 61
static void tsdbCloseFile(STsdbFD **ppFD) {
  STsdbFD *pFD = *ppFD;
H
Hongze Cheng 已提交
62 63
  taosMemoryFree(pFD->pBuf);
  taosCloseFile(&pFD->pFD);
H
Hongze Cheng 已提交
64 65
  taosMemoryFree(pFD);
  *ppFD = NULL;
H
Hongze Cheng 已提交
66 67
}

H
Hongze Cheng 已提交
68
static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
H
Hongze Cheng 已提交
69
  int32_t code = 0;
H
Hongze Cheng 已提交
70

H
Hongze Cheng 已提交
71 72 73 74 75 76
  if (pFD->pgno > 0) {
    int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET);
    if (n < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _exit;
    }
H
Hongze Cheng 已提交
77

H
Hongze Cheng 已提交
78
    taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
H
Hongze Cheng 已提交
79

H
Hongze Cheng 已提交
80 81 82 83 84
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
    if (n < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _exit;
    }
H
Hongze Cheng 已提交
85

H
Hongze Cheng 已提交
86
    if (pFD->szFile < pFD->pgno) {
H
Hongze Cheng 已提交
87
      pFD->szFile = pFD->pgno;
H
Hongze Cheng 已提交
88
    }
H
Hongze Cheng 已提交
89
  }
H
Hongze Cheng 已提交
90
  pFD->pgno = 0;
H
Hongze Cheng 已提交
91 92

_exit:
H
Hongze Cheng 已提交
93 94 95
  return code;
}

H
Hongze Cheng 已提交
96
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
H
Hongze Cheng 已提交
97
  int32_t code = 0;
H
more  
Hongze Cheng 已提交
98

H
Hongze Cheng 已提交
99 100
  ASSERT(pgno <= pFD->szFile);

H
Hongze Cheng 已提交
101 102 103
  // seek
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
H
Hongze Cheng 已提交
104
  if (n < 0) {
H
Hongze Cheng 已提交
105
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
106
    goto _exit;
H
Hongze Cheng 已提交
107 108
  }

H
Hongze Cheng 已提交
109
  // read
H
Hongze Cheng 已提交
110
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
H
Hongze Cheng 已提交
111
  if (n < 0) {
H
Hongze Cheng 已提交
112
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
113 114
    goto _exit;
  } else if (n < pFD->szPage) {
H
Hongze Cheng 已提交
115
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
116
    goto _exit;
H
Hongze Cheng 已提交
117 118
  }

H
Hongze Cheng 已提交
119
  // check
H
Hongze Cheng 已提交
120
  if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
H
Hongze Cheng 已提交
121
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
122
    goto _exit;
H
Hongze Cheng 已提交
123 124
  }

H
Hongze Cheng 已提交
125
  pFD->pgno = pgno;
H
Hongze Cheng 已提交
126

H
Hongze Cheng 已提交
127 128 129
_exit:
  return code;
}
H
Hongze Cheng 已提交
130

H
Hongze Cheng 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
  int32_t code = 0;
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
  int64_t bOffset = fOffset % pFD->szPage;
  int64_t n = 0;

  do {
    if (pFD->pgno != pgno) {
      code = tsdbWriteFilePage(pFD);
      if (code) goto _exit;

      if (pgno < pFD->szFile) {
        code = tsdbReadFilePage(pFD, pgno);
        if (code) goto _exit;
      } else {
        pFD->pgno = pgno;
      }
    }

H
Hongze Cheng 已提交
151 152
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
H
Hongze Cheng 已提交
153 154 155

    pgno++;
    bOffset = 0;
H
Hongze Cheng 已提交
156
    n += nWrite;
H
Hongze Cheng 已提交
157 158 159 160 161 162
  } while (n < size);

_exit:
  return code;
}

H
Hongze Cheng 已提交
163
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
H
Hongze Cheng 已提交
164
  int32_t code = 0;
H
Hongze Cheng 已提交
165
  int64_t n = 0;
H
Hongze Cheng 已提交
166 167
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
H
Hongze Cheng 已提交
168
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
H
fix bug  
Hongze Cheng 已提交
169
  int64_t bOffset = fOffset % pFD->szPage;
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171
  ASSERT(pgno && pgno <= pFD->szFile);
H
fix bug  
Hongze Cheng 已提交
172
  ASSERT(bOffset < szPgCont);
H
Hongze Cheng 已提交
173

H
Hongze Cheng 已提交
174
  while (n < size) {
H
fix bug  
Hongze Cheng 已提交
175 176 177 178
    if (pFD->pgno != pgno) {
      code = tsdbReadFilePage(pFD, pgno);
      if (code) goto _exit;
    }
H
Hongze Cheng 已提交
179

H
fix bug  
Hongze Cheng 已提交
180 181
    int64_t nRead = TMIN(szPgCont - bOffset, size - n);
    memcpy(pBuf + n, pFD->pBuf + bOffset, nRead);
H
Hongze Cheng 已提交
182

H
Hongze Cheng 已提交
183
    n += nRead;
H
Hongze Cheng 已提交
184
    pgno++;
H
fix bug  
Hongze Cheng 已提交
185
    bOffset = 0;
H
Hongze Cheng 已提交
186 187 188
  }

_exit:
H
Hongze Cheng 已提交
189
  return code;
H
Hongze Cheng 已提交
190 191
}

H
Hongze Cheng 已提交
192
static int32_t tsdbFsyncFile(STsdbFD *pFD) {
H
Hongze Cheng 已提交
193
  int32_t code = 0;
H
Hongze Cheng 已提交
194 195 196 197 198 199 200 201 202 203

  code = tsdbWriteFilePage(pFD);
  if (code) goto _exit;

  if (taosFsyncFile(pFD->pFD) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _exit;
  }

_exit:
H
Hongze Cheng 已提交
204 205 206
  return code;
}

H
Hongze Cheng 已提交
207 208
// SDataFWriter ====================================================
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
H
Hongze Cheng 已提交
209
  int32_t       code = 0;
H
Hongze Cheng 已提交
210 211
  int32_t       flag;
  int64_t       n;
H
Hongze Cheng 已提交
212
  int32_t       szPage = TSDB_DEFAULT_PAGE_SIZE;
H
Hongze Cheng 已提交
213
  SDataFWriter *pWriter = NULL;
H
Hongze Cheng 已提交
214
  char          fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
215
  char          hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
216 217

  // alloc
H
Hongze Cheng 已提交
218 219
  pWriter = taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
H
Hongze Cheng 已提交
220 221 222
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
223
  pWriter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
224 225 226 227 228 229
  pWriter->wSet = (SDFileSet){.diskId = pSet->diskId,
                              .fid = pSet->fid,
                              .pHeadF = &pWriter->fHead,
                              .pDataF = &pWriter->fData,
                              .pSmaF = &pWriter->fSma,
                              .nSstF = pSet->nSstF};
H
Hongze Cheng 已提交
230 231 232 233 234 235 236
  pWriter->fHead = *pSet->pHeadF;
  pWriter->fData = *pSet->pDataF;
  pWriter->fSma = *pSet->pSmaF;
  for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) {
    pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst];
    pWriter->fSst[iSst] = *pSet->aSstF[iSst];
  }
H
Hongze Cheng 已提交
237 238

  // head
H
Hongze Cheng 已提交
239
  flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
240
  tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
H
Hongze Cheng 已提交
241 242
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244
  code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
245
  if (code) goto _err;
H
Hongze Cheng 已提交
246 247
  pWriter->fHead.size += TSDB_FHDR_SIZE;

H
Hongze Cheng 已提交
248
  // data
H
Hongze Cheng 已提交
249
  if (pWriter->fData.size == 0) {
H
Hongze Cheng 已提交
250
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
251
  } else {
H
Hongze Cheng 已提交
252
    flag = TD_FILE_READ | TD_FILE_WRITE;
H
Hongze Cheng 已提交
253 254
  }
  tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
H
Hongze Cheng 已提交
255 256
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
257
  if (pWriter->fData.size == 0) {
H
Hongze Cheng 已提交
258
    code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
259
    if (code) goto _err;
H
Hongze Cheng 已提交
260 261
    pWriter->fData.size += TSDB_FHDR_SIZE;
  }
H
Hongze Cheng 已提交
262 263

  // sma
H
Hongze Cheng 已提交
264
  if (pWriter->fSma.size == 0) {
H
Hongze Cheng 已提交
265
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
266
  } else {
H
Hongze Cheng 已提交
267
    flag = TD_FILE_READ | TD_FILE_WRITE;
H
Hongze Cheng 已提交
268 269
  }
  tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
H
Hongze Cheng 已提交
270 271
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
272
  if (pWriter->fSma.size == 0) {
H
Hongze Cheng 已提交
273
    code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
274
    if (code) goto _err;
H
Hongze Cheng 已提交
275

H
Hongze Cheng 已提交
276
    pWriter->fSma.size += TSDB_FHDR_SIZE;
H
Hongze Cheng 已提交
277 278
  }

H
Hongze Cheng 已提交
279 280
  // sst
  ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0);
H
Hongze Cheng 已提交
281
  flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
282
  tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname);
H
Hongze Cheng 已提交
283
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSstFD);
H
Hongze Cheng 已提交
284
  if (code) goto _err;
H
Hongze Cheng 已提交
285
  code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
286
  if (code) goto _err;
H
Hongze Cheng 已提交
287 288 289
  pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE;

  *ppWriter = pWriter;
H
Hongze Cheng 已提交
290 291 292
  return code;

_err:
H
Hongze Cheng 已提交
293 294
  tsdbError("vgId:%d, tsdb data file writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppWriter = NULL;
H
Hongze Cheng 已提交
295 296 297
  return code;
}

H
Hongze Cheng 已提交
298
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
H
Hongze Cheng 已提交
299
  int32_t code = 0;
H
Hongze Cheng 已提交
300
  STsdb  *pTsdb = NULL;
H
Hongze Cheng 已提交
301

H
Hongze Cheng 已提交
302 303 304 305
  if (*ppWriter == NULL) goto _exit;

  pTsdb = (*ppWriter)->pTsdb;
  if (sync) {
H
Hongze Cheng 已提交
306 307
    code = tsdbFsyncFile((*ppWriter)->pHeadFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309 310
    code = tsdbFsyncFile((*ppWriter)->pDataFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
311

H
Hongze Cheng 已提交
312 313
    code = tsdbFsyncFile((*ppWriter)->pSmaFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
314

H
Hongze Cheng 已提交
315 316
    code = tsdbFsyncFile((*ppWriter)->pSstFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
317 318
  }

H
Hongze Cheng 已提交
319 320 321
  tsdbCloseFile(&(*ppWriter)->pHeadFD);
  tsdbCloseFile(&(*ppWriter)->pDataFD);
  tsdbCloseFile(&(*ppWriter)->pSmaFD);
H
Hongze Cheng 已提交
322
  tsdbCloseFile(&(*ppWriter)->pSstFD);
H
Hongze Cheng 已提交
323

H
Hongze Cheng 已提交
324 325
  for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree((*ppWriter)->aBuf[iBuf]);
H
Hongze Cheng 已提交
326
  }
H
Hongze Cheng 已提交
327
  taosMemoryFree(*ppWriter);
H
Hongze Cheng 已提交
328
_exit:
H
Hongze Cheng 已提交
329
  *ppWriter = NULL;
H
Hongze Cheng 已提交
330 331 332
  return code;

_err:
H
Hongze Cheng 已提交
333
  tsdbError("vgId:%d, data file writer close failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
334 335 336
  return code;
}

H
Hongze Cheng 已提交
337 338 339 340
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
  int32_t code = 0;
  int64_t n;
  char    hdr[TSDB_FHDR_SIZE];
H
Hongze Cheng 已提交
341

H
Hongze Cheng 已提交
342 343 344
  // head ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutHeadFile(hdr, &pWriter->fHead);
H
Hongze Cheng 已提交
345
  code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
346
  if (code) goto _err;
H
Hongze Cheng 已提交
347 348 349 350

  // data ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutDataFile(hdr, &pWriter->fData);
H
Hongze Cheng 已提交
351
  code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
352
  if (code) goto _err;
H
Hongze Cheng 已提交
353

H
Hongze Cheng 已提交
354 355 356
  // sma ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutSmaFile(hdr, &pWriter->fSma);
H
Hongze Cheng 已提交
357
  code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
358
  if (code) goto _err;
H
Hongze Cheng 已提交
359

H
Hongze Cheng 已提交
360 361 362
  // sst ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]);
H
Hongze Cheng 已提交
363
  code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
364
  if (code) goto _err;
H
Hongze Cheng 已提交
365 366 367 368

  return code;

_err:
H
Hongze Cheng 已提交
369
  tsdbError("vgId:%d, update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
370 371 372
  return code;
}

H
Hongze Cheng 已提交
373 374 375
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
  int32_t    code = 0;
  SHeadFile *pHeadFile = &pWriter->fHead;
H
Hongze Cheng 已提交
376
  int64_t    size;
H
Hongze Cheng 已提交
377
  int64_t    n;
H
Hongze Cheng 已提交
378

H
Hongze Cheng 已提交
379 380 381
  // check
  if (taosArrayGetSize(aBlockIdx) == 0) {
    pHeadFile->offset = pHeadFile->size;
H
Hongze Cheng 已提交
382 383
    goto _exit;
  }
H
Hongze Cheng 已提交
384

H
Hongze Cheng 已提交
385
  // prepare
H
Hongze Cheng 已提交
386
  size = 0;
H
Hongze Cheng 已提交
387 388 389 390
  for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
    size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx));
  }

H
Hongze Cheng 已提交
391
  // alloc
H
Hongze Cheng 已提交
392
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
393 394
  if (code) goto _err;

H
Hongze Cheng 已提交
395 396 397 398
  // build
  n = 0;
  for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
    n += tPutBlockIdx(pWriter->aBuf[0] + n, taosArrayGet(aBlockIdx, iBlockIdx));
H
Hongze Cheng 已提交
399
  }
H
Hongze Cheng 已提交
400
  ASSERT(n == size);
H
Hongze Cheng 已提交
401 402

  // write
H
Hongze Cheng 已提交
403
  code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
404
  if (code) goto _err;
H
Hongze Cheng 已提交
405

H
Hongze Cheng 已提交
406 407 408
  // update
  pHeadFile->offset = pHeadFile->size;
  pHeadFile->size += size;
H
Hongze Cheng 已提交
409

H
Hongze Cheng 已提交
410
_exit:
H
Hongze Cheng 已提交
411 412 413
  // tsdbTrace("vgId:%d write block idx, offset:%" PRId64 " size:%" PRId64 " nBlockIdx:%d",
  // TD_VID(pWriter->pTsdb->pVnode),
  //           pHeadFile->offset, size, taosArrayGetSize(aBlockIdx));
H
Hongze Cheng 已提交
414 415 416
  return code;

_err:
H
Hongze Cheng 已提交
417
  tsdbError("vgId:%d, write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
418 419 420
  return code;
}

H
Hongze Cheng 已提交
421 422 423 424 425 426 427
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) {
  int32_t    code = 0;
  SHeadFile *pHeadFile = &pWriter->fHead;
  int64_t    size;
  int64_t    n;

  ASSERT(mBlock->nItem > 0);
H
Hongze Cheng 已提交
428

H
Hongze Cheng 已提交
429
  // alloc
H
Hongze Cheng 已提交
430
  size = tPutMapData(NULL, mBlock);
H
Hongze Cheng 已提交
431
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
432 433
  if (code) goto _err;

H
Hongze Cheng 已提交
434
  // build
H
Hongze Cheng 已提交
435
  n = tPutMapData(pWriter->aBuf[0], mBlock);
H
Hongze Cheng 已提交
436 437

  // write
H
Hongze Cheng 已提交
438
  code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
439
  if (code) goto _err;
H
Hongze Cheng 已提交
440

H
Hongze Cheng 已提交
441 442 443 444
  // update
  pBlockIdx->offset = pHeadFile->size;
  pBlockIdx->size = size;
  pHeadFile->size += size;
H
Hongze Cheng 已提交
445

H
Hongze Cheng 已提交
446 447 448 449
  tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64
            " size:%" PRId64 " nItem:%d",
            TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid,
            pBlockIdx->offset, pBlockIdx->size, mBlock->nItem);
H
Hongze Cheng 已提交
450 451 452
  return code;

_err:
H
Hongze Cheng 已提交
453
  tsdbError("vgId:%d, write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
454 455 456
  return code;
}

H
Hongze Cheng 已提交
457
int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
H
Hongze Cheng 已提交
458
  int32_t   code = 0;
H
Hongze Cheng 已提交
459
  SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1];
H
Hongze Cheng 已提交
460
  int64_t   size;
H
Hongze Cheng 已提交
461
  int64_t   n;
H
Hongze Cheng 已提交
462

H
Hongze Cheng 已提交
463 464 465 466 467
  // check
  if (taosArrayGetSize(aSstBlk) == 0) {
    pSstFile->offset = pSstFile->size;
    goto _exit;
  }
H
Hongze Cheng 已提交
468

H
Hongze Cheng 已提交
469
  // size
H
Hongze Cheng 已提交
470
  size = 0;
H
Hongze Cheng 已提交
471 472 473
  for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
    size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL));
  }
H
Hongze Cheng 已提交
474 475

  // alloc
H
Hongze Cheng 已提交
476
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
477 478
  if (code) goto _err;

H
Hongze Cheng 已提交
479 480 481 482
  // encode
  n = 0;
  for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
    n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL));
H
Hongze Cheng 已提交
483
  }
H
Hongze Cheng 已提交
484 485

  // write
H
Hongze Cheng 已提交
486
  code = tsdbWriteFile(pWriter->pSstFD, pSstFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
487
  if (code) goto _err;
H
Hongze Cheng 已提交
488

H
Hongze Cheng 已提交
489 490 491
  // update
  pSstFile->offset = pSstFile->size;
  pSstFile->size += size;
H
Hongze Cheng 已提交
492

H
Hongze Cheng 已提交
493
_exit:
H
Hongze Cheng 已提交
494
  tsdbTrace("vgId:%d tsdb write sst block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
H
Hongze Cheng 已提交
495
            pSstFile->offset, size);
H
Hongze Cheng 已提交
496 497 498
  return code;

_err:
H
Hongze Cheng 已提交
499
  tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
500 501 502
  return code;
}

H
Hongze Cheng 已提交
503
static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) {
H
Hongze Cheng 已提交
504 505
  int32_t code = 0;

H
Hongze Cheng 已提交
506 507
  pSmaInfo->offset = 0;
  pSmaInfo->size = 0;
H
Hongze Cheng 已提交
508

H
Hongze Cheng 已提交
509 510 511
  // encode
  for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
H
Hongze Cheng 已提交
512

H
Hongze Cheng 已提交
513
    if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue;
H
Hongze Cheng 已提交
514

H
Hongze Cheng 已提交
515 516
    SColumnDataAgg sma;
    tsdbCalcColDataSMA(pColData, &sma);
H
Hongze Cheng 已提交
517

H
Hongze Cheng 已提交
518 519 520 521
    code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &sma));
    if (code) goto _err;
    pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &sma);
  }
H
Hongze Cheng 已提交
522

H
Hongze Cheng 已提交
523 524
  // write
  if (pSmaInfo->size) {
H
Hongze Cheng 已提交
525
    code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
526
    if (code) goto _err;
H
Hongze Cheng 已提交
527

H
Hongze Cheng 已提交
528
    code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
529
    if (code) goto _err;
H
Hongze Cheng 已提交
530

H
Hongze Cheng 已提交
531
    pSmaInfo->offset = pWriter->fSma.size;
H
Hongze Cheng 已提交
532
    pWriter->fSma.size += pSmaInfo->size;
H
Hongze Cheng 已提交
533 534 535 536 537
  }

  return code;

_err:
H
Hongze Cheng 已提交
538
  tsdbError("vgId:%d tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
539 540 541
  return code;
}

H
Hongze Cheng 已提交
542 543
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
                           int8_t cmprAlg, int8_t toLast) {
H
Hongze Cheng 已提交
544 545
  int32_t code = 0;

H
Hongze Cheng 已提交
546
  ASSERT(pBlockData->nRow > 0);
H
Hongze Cheng 已提交
547

H
Hongze Cheng 已提交
548 549 550 551 552
  if (toLast) {
    pBlkInfo->offset = pWriter->fSst[pWriter->wSet.nSstF - 1].size;
  } else {
    pBlkInfo->offset = pWriter->fData.size;
  }
H
Hongze Cheng 已提交
553 554
  pBlkInfo->szBlock = 0;
  pBlkInfo->szKey = 0;
H
Hongze Cheng 已提交
555

H
Hongze Cheng 已提交
556 557 558
  int32_t aBufN[4] = {0};
  code = tCmprBlockData(pBlockData, cmprAlg, NULL, NULL, pWriter->aBuf, aBufN);
  if (code) goto _err;
H
Hongze Cheng 已提交
559

H
Hongze Cheng 已提交
560
  // write =================
H
Hongze Cheng 已提交
561
  STsdbFD *pFD = toLast ? pWriter->pSstFD : pWriter->pDataFD;
H
Hongze Cheng 已提交
562

H
Hongze Cheng 已提交
563 564
  pBlkInfo->szKey = aBufN[3] + aBufN[2];
  pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
H
Hongze Cheng 已提交
565

H
Hongze Cheng 已提交
566 567
  int64_t offset = pBlkInfo->offset;
  code = tsdbWriteFile(pFD, offset, pWriter->aBuf[3], aBufN[3]);
H
Hongze Cheng 已提交
568
  if (code) goto _err;
H
Hongze Cheng 已提交
569
  offset += aBufN[3];
H
Hongze Cheng 已提交
570

H
Hongze Cheng 已提交
571
  code = tsdbWriteFile(pFD, offset, pWriter->aBuf[2], aBufN[2]);
H
Hongze Cheng 已提交
572
  if (code) goto _err;
H
Hongze Cheng 已提交
573
  offset += aBufN[2];
H
Hongze Cheng 已提交
574

H
Hongze Cheng 已提交
575
  if (aBufN[1]) {
H
Hongze Cheng 已提交
576
    code = tsdbWriteFile(pFD, offset, pWriter->aBuf[1], aBufN[1]);
H
Hongze Cheng 已提交
577
    if (code) goto _err;
H
Hongze Cheng 已提交
578
    offset += aBufN[1];
H
Hongze Cheng 已提交
579
  }
H
Hongze Cheng 已提交
580

H
Hongze Cheng 已提交
581
  if (aBufN[0]) {
H
Hongze Cheng 已提交
582
    code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], aBufN[0]);
H
Hongze Cheng 已提交
583
    if (code) goto _err;
H
Hongze Cheng 已提交
584
  }
H
Hongze Cheng 已提交
585

H
Hongze Cheng 已提交
586 587 588 589 590 591
  // update info
  if (toLast) {
    pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock;
  } else {
    pWriter->fData.size += pBlkInfo->szBlock;
  }
H
Hongze Cheng 已提交
592

H
Hongze Cheng 已提交
593 594 595 596 597
  // ================= SMA ====================
  if (pSmaInfo) {
    code = tsdbWriteBlockSma(pWriter, pBlockData, pSmaInfo);
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
598

H
Hongze Cheng 已提交
599 600 601 602
_exit:
  tsdbTrace("vgId:%d tsdb write block data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d, offset:%" PRId64 " size:%d",
            TD_VID(pWriter->pTsdb->pVnode), pBlockData->suid, pBlockData->uid, pBlockData->nRow, pBlkInfo->offset,
            pBlkInfo->szBlock);
H
Hongze Cheng 已提交
603 604 605
  return code;

_err:
H
Hongze Cheng 已提交
606
  tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
607 608
  return code;
}
H
Hongze Cheng 已提交
609

H
Hongze Cheng 已提交
610 611 612 613 614 615 616 617
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
  int32_t   code = 0;
  int64_t   n;
  int64_t   size;
  TdFilePtr pOutFD = NULL;  // TODO
  TdFilePtr PInFD = NULL;   // TODO
  char      fNameFrom[TSDB_FILENAME_LEN];
  char      fNameTo[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
618

H
Hongze Cheng 已提交
619 620 621
  // head
  tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
  tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
H
Hongze Cheng 已提交
622

H
Hongze Cheng 已提交
623 624 625
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
626 627 628
    goto _err;
  }

H
Hongze Cheng 已提交
629 630
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
631 632 633 634
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
635
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pHeadF->size);
H
Hongze Cheng 已提交
636 637 638 639
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
640 641
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
642 643

  // data
H
Hongze Cheng 已提交
644 645
  tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom);
  tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo);
H
Hongze Cheng 已提交
646

H
Hongze Cheng 已提交
647 648
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
H
Hongze Cheng 已提交
649 650 651 652
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
653 654
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
655 656 657
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
658 659

  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pDataF->size);
H
Hongze Cheng 已提交
660 661 662 663
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
664 665
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
666

H
Hongze Cheng 已提交
667 668 669
  // sst
  tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom);
  tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo);
H
Hongze Cheng 已提交
670

H
Hongze Cheng 已提交
671 672 673 674
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
675 676
  }

H
Hongze Cheng 已提交
677 678
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
679 680 681 682
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
683 684
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size);
  if (n < 0) {
H
Hongze Cheng 已提交
685 686 687
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
688 689
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
690

H
Hongze Cheng 已提交
691 692 693 694 695 696
  // sma
  tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom);
  tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo);

  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
H
Hongze Cheng 已提交
697 698 699 700
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
701 702
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
703 704 705 706
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
707 708 709 710
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pSmaF->size);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
711
  }
H
Hongze Cheng 已提交
712 713 714
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);

H
Hongze Cheng 已提交
715 716 717
  return code;

_err:
H
Hongze Cheng 已提交
718
  tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
719 720 721
  return code;
}

H
Hongze Cheng 已提交
722 723 724 725
// SDataFReader ====================================================
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
  int32_t       code = 0;
  SDataFReader *pReader;
H
Hongze Cheng 已提交
726
  int32_t       szPage = TSDB_DEFAULT_PAGE_SIZE;
H
Hongze Cheng 已提交
727
  char          fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
728

H
Hongze Cheng 已提交
729 730 731 732
  // alloc
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
733 734
    goto _err;
  }
H
Hongze Cheng 已提交
735 736
  pReader->pTsdb = pTsdb;
  pReader->pSet = pSet;
H
Hongze Cheng 已提交
737

H
Hongze Cheng 已提交
738 739
  // head
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
H
Hongze Cheng 已提交
740 741
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
742

H
Hongze Cheng 已提交
743 744
  // data
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
H
Hongze Cheng 已提交
745 746
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
747

H
Hongze Cheng 已提交
748 749
  // sma
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
H
Hongze Cheng 已提交
750 751
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
752

H
Hongze Cheng 已提交
753 754 755
  // sst
  for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
    tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
H
Hongze Cheng 已提交
756
    code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSstFD[iSst]);
H
Hongze Cheng 已提交
757
    if (code) goto _err;
H
Hongze Cheng 已提交
758
  }
H
Hongze Cheng 已提交
759

H
Hongze Cheng 已提交
760 761 762 763 764 765 766 767 768 769 770
  *ppReader = pReader;
  return code;

_err:
  tsdbError("vgId:%d, tsdb data file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
}

int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
  int32_t code = 0;
H
Hongze Cheng 已提交
771
  if (*ppReader == NULL) return code;
H
Hongze Cheng 已提交
772 773

  // head
H
Hongze Cheng 已提交
774
  tsdbCloseFile(&(*ppReader)->pHeadFD);
H
Hongze Cheng 已提交
775

H
Hongze Cheng 已提交
776
  // data
H
Hongze Cheng 已提交
777
  tsdbCloseFile(&(*ppReader)->pDataFD);
H
Hongze Cheng 已提交
778

H
Hongze Cheng 已提交
779
  // sma
H
Hongze Cheng 已提交
780
  tsdbCloseFile(&(*ppReader)->pSmaFD);
H
Hongze Cheng 已提交
781

H
Hongze Cheng 已提交
782
  // sst
H
Hongze Cheng 已提交
783 784 785 786
  for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) {
    if ((*ppReader)->aSstFD[iSst]) {
      tsdbCloseFile(&(*ppReader)->aSstFD[iSst]);
    }
H
Hongze Cheng 已提交
787 788 789 790
  }

  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree((*ppReader)->aBuf[iBuf]);
H
Hongze Cheng 已提交
791
  }
H
Hongze Cheng 已提交
792 793
  taosMemoryFree(*ppReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
794 795 796
  return code;

_err:
H
Hongze Cheng 已提交
797
  tsdbError("vgId:%d, data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
798 799 800
  return code;
}

H
Hongze Cheng 已提交
801
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
H
Hongze Cheng 已提交
802 803 804 805
  int32_t    code = 0;
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
  int64_t    offset = pHeadFile->offset;
  int64_t    size = pHeadFile->size - offset;
H
Hongze Cheng 已提交
806

H
Hongze Cheng 已提交
807
  taosArrayClear(aBlockIdx);
H
Hongze Cheng 已提交
808
  if (size == 0) return code;
H
Hongze Cheng 已提交
809 810

  // alloc
H
Hongze Cheng 已提交
811
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
812 813
  if (code) goto _err;

H
Hongze Cheng 已提交
814
  // read
H
Hongze Cheng 已提交
815 816
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
817 818

  // decode
H
Hongze Cheng 已提交
819
  int64_t n = 0;
H
Hongze Cheng 已提交
820
  while (n < size) {
H
Hongze Cheng 已提交
821 822 823 824 825 826 827
    SBlockIdx blockIdx;
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);

    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
828
  }
H
Hongze Cheng 已提交
829
  ASSERT(n == size);
H
Hongze Cheng 已提交
830 831 832 833

  return code;

_err:
H
Hongze Cheng 已提交
834
  tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
835 836 837
  return code;
}

H
Hongze Cheng 已提交
838
int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
H
Hongze Cheng 已提交
839 840 841 842
  int32_t   code = 0;
  SSstFile *pSstFile = pReader->pSet->aSstF[iSst];
  int64_t   offset = pSstFile->offset;
  int64_t   size = pSstFile->size - offset;
H
Hongze Cheng 已提交
843

H
Hongze Cheng 已提交
844
  taosArrayClear(aSstBlk);
H
Hongze Cheng 已提交
845
  if (size == 0) return code;
H
Hongze Cheng 已提交
846 847

  // alloc
H
Hongze Cheng 已提交
848
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
849 850
  if (code) goto _err;

H
Hongze Cheng 已提交
851
  // read
H
Hongze Cheng 已提交
852 853
  code = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
854

H
Hongze Cheng 已提交
855
  // decode
H
Hongze Cheng 已提交
856
  int64_t n = 0;
H
Hongze Cheng 已提交
857 858 859
  while (n < size) {
    SSstBlk sstBlk;
    n += tGetSstBlk(pReader->aBuf[0] + n, &sstBlk);
H
Hongze Cheng 已提交
860

H
Hongze Cheng 已提交
861
    if (taosArrayPush(aSstBlk, &sstBlk) == NULL) {
H
Hongze Cheng 已提交
862 863 864 865
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
  }
H
Hongze Cheng 已提交
866
  ASSERT(n == size);
H
Hongze Cheng 已提交
867

H
Hongze Cheng 已提交
868 869 870
  return code;

_err:
H
Hongze Cheng 已提交
871
  tsdbError("vgId:%d read sst blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
872 873 874
  return code;
}

H
Hongze Cheng 已提交
875 876 877 878
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) {
  int32_t code = 0;
  int64_t offset = pBlockIdx->offset;
  int64_t size = pBlockIdx->size;
H
Hongze Cheng 已提交
879 880

  // alloc
H
Hongze Cheng 已提交
881
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
882 883
  if (code) goto _err;

H
Hongze Cheng 已提交
884
  // read
H
Hongze Cheng 已提交
885 886
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
887

H
Hongze Cheng 已提交
888
  // decode
H
Hongze Cheng 已提交
889 890
  int64_t n = tGetMapData(pReader->aBuf[0], mBlock);
  if (n < 0) {
H
Hongze Cheng 已提交
891 892 893
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
894
  ASSERT(n == size);
H
Hongze Cheng 已提交
895

H
Hongze Cheng 已提交
896
  return code;
H
Hongze Cheng 已提交
897

H
Hongze Cheng 已提交
898 899 900
_err:
  tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
901
}
H
Hongze Cheng 已提交
902

H
Hongze Cheng 已提交
903 904 905
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) {
  int32_t   code = 0;
  SSmaInfo *pSmaInfo = &pDataBlk->smaInfo;
H
Hongze Cheng 已提交
906

H
Hongze Cheng 已提交
907
  ASSERT(pSmaInfo->size > 0);
H
Hongze Cheng 已提交
908

H
Hongze Cheng 已提交
909
  taosArrayClear(aColumnDataAgg);
H
Hongze Cheng 已提交
910

H
Hongze Cheng 已提交
911
  // alloc
H
Hongze Cheng 已提交
912
  code = tRealloc(&pReader->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
913
  if (code) goto _err;
H
Hongze Cheng 已提交
914

H
Hongze Cheng 已提交
915
  // read
H
Hongze Cheng 已提交
916
  code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
917
  if (code) goto _err;
H
Hongze Cheng 已提交
918

H
Hongze Cheng 已提交
919
  // decode
H
Hongze Cheng 已提交
920
  int32_t n = 0;
H
Hongze Cheng 已提交
921 922 923
  while (n < pSmaInfo->size) {
    SColumnDataAgg sma;
    n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma);
H
Hongze Cheng 已提交
924

H
Hongze Cheng 已提交
925 926
    if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
927 928
      goto _err;
    }
H
Hongze Cheng 已提交
929
  }
H
Hongze Cheng 已提交
930
  ASSERT(n == pSmaInfo->size);
H
Hongze Cheng 已提交
931
  return code;
H
Hongze Cheng 已提交
932

H
Hongze Cheng 已提交
933
_err:
H
Hongze Cheng 已提交
934
  tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
935 936
  return code;
}
H
Hongze Cheng 已提交
937

H
Hongze Cheng 已提交
938
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData) {
H
Hongze Cheng 已提交
939
  int32_t code = 0;
H
Hongze Cheng 已提交
940

H
Hongze Cheng 已提交
941 942
  tBlockDataClear(pBlockData);

H
Hongze Cheng 已提交
943
  STsdbFD *pFD = pReader->pDataFD;
H
Hongze Cheng 已提交
944 945

  // uid + version + tskey
H
Hongze Cheng 已提交
946 947 948 949 950 951
  code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
  if (code) goto _err;

  code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey);
  if (code) goto _err;

H
Hongze Cheng 已提交
952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970
  SDiskDataHdr hdr;
  uint8_t     *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr);

  ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
  ASSERT(pBlockData->suid == hdr.suid);
  ASSERT(pBlockData->uid == hdr.uid);

  pBlockData->nRow = hdr.nRow;

  // uid
  if (hdr.uid == 0) {
    ASSERT(hdr.szUid);
    code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid,
                          sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
    if (code) goto _err;
  } else {
    ASSERT(!hdr.szUid);
  }
  p += hdr.szUid;
H
Hongze Cheng 已提交
971

H
Hongze Cheng 已提交
972 973 974 975 976
  // version
  code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion,
                        sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
  if (code) goto _err;
  p += hdr.szVer;
H
Hongze Cheng 已提交
977

H
Hongze Cheng 已提交
978 979 980
  // TSKEY
  code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
                        sizeof(TSKEY) * hdr.nRow, &pReader->aBuf[1]);
H
Hongze Cheng 已提交
981
  if (code) goto _err;
H
Hongze Cheng 已提交
982
  p += hdr.szKey;
H
Hongze Cheng 已提交
983

H
Hongze Cheng 已提交
984
  ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey);
H
Hongze Cheng 已提交
985

H
Hongze Cheng 已提交
986 987
  // read and decode columns
  if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit;
H
Hongze Cheng 已提交
988

H
Hongze Cheng 已提交
989 990
  if (hdr.szBlkCol > 0) {
    int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
H
Hongze Cheng 已提交
991 992 993 994

    code = tRealloc(&pReader->aBuf[0], hdr.szBlkCol);
    if (code) goto _err;

H
Hongze Cheng 已提交
995 996
    code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol);
    if (code) goto _err;
H
Hongze Cheng 已提交
997 998
  }

H
Hongze Cheng 已提交
999 1000 1001
  SBlockCol  blockCol = {.cid = 0};
  SBlockCol *pBlockCol = &blockCol;
  int32_t    n = 0;
H
Hongze Cheng 已提交
1002

H
Hongze Cheng 已提交
1003 1004
  for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
H
Hongze Cheng 已提交
1005

H
Hongze Cheng 已提交
1006 1007 1008 1009 1010 1011 1012
    while (pBlockCol && pBlockCol->cid < pColData->cid) {
      if (n < hdr.szBlkCol) {
        n += tGetBlockCol(pReader->aBuf[0] + n, pBlockCol);
      } else {
        ASSERT(n == hdr.szBlkCol);
        pBlockCol = NULL;
      }
H
Hongze Cheng 已提交
1013
    }
H
Hongze Cheng 已提交
1014

H
Hongze Cheng 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023
    if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) {
      // add a lot of NONE
      for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
        code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
        if (code) goto _err;
      }
    } else {
      ASSERT(pBlockCol->type == pColData->type);
      ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
H
Hongze Cheng 已提交
1024

H
Hongze Cheng 已提交
1025 1026 1027 1028 1029 1030 1031 1032
      if (pBlockCol->flag == HAS_NULL) {
        // add a lot of NULL
        for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
          code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
          if (code) goto _err;
        }
      } else {
        // decode from binary
H
Hongze Cheng 已提交
1033 1034
        int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + pBlockCol->offset;
        int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue;
H
Hongze Cheng 已提交
1035

H
Hongze Cheng 已提交
1036 1037 1038
        code = tRealloc(&pReader->aBuf[1], size);
        if (code) goto _err;

H
Hongze Cheng 已提交
1039 1040
        code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size);
        if (code) goto _err;
H
Hongze Cheng 已提交
1041 1042 1043 1044 1045

        code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
        if (code) goto _err;
      }
    }
H
Hongze Cheng 已提交
1046
  }
H
Hongze Cheng 已提交
1047

H
Hongze Cheng 已提交
1048
_exit:
H
Hongze Cheng 已提交
1049 1050
  return code;

H
Hongze Cheng 已提交
1051
_err:
H
Hongze Cheng 已提交
1052
  tsdbError("vgId:%d tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1053
  return code;
H
Hongze Cheng 已提交
1054
}
H
Hongze Cheng 已提交
1055

H
Hongze Cheng 已提交
1056 1057
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1058

H
Hongze Cheng 已提交
1059
  code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData);
H
Hongze Cheng 已提交
1060
  if (code) goto _err;
H
Hongze Cheng 已提交
1061

H
Hongze Cheng 已提交
1062 1063 1064
  if (pDataBlk->nSubBlock > 1) {
    SBlockData bData1;
    SBlockData bData2;
H
Hongze Cheng 已提交
1065

H
Hongze Cheng 已提交
1066 1067 1068 1069 1070
    // create
    code = tBlockDataCreate(&bData1);
    if (code) goto _err;
    code = tBlockDataCreate(&bData2);
    if (code) goto _err;
H
Hongze Cheng 已提交
1071

H
Hongze Cheng 已提交
1072 1073 1074
    // init
    tBlockDataInitEx(&bData1, pBlockData);
    tBlockDataInitEx(&bData2, pBlockData);
H
Hongze Cheng 已提交
1075

H
Hongze Cheng 已提交
1076
    for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
H
Hongze Cheng 已提交
1077
      code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], &bData1);
H
Hongze Cheng 已提交
1078 1079 1080 1081 1082
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
H
Hongze Cheng 已提交
1083

H
Hongze Cheng 已提交
1084 1085 1086 1087 1088 1089
      code = tBlockDataCopy(pBlockData, &bData2);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
H
Hongze Cheng 已提交
1090

H
Hongze Cheng 已提交
1091 1092 1093 1094 1095 1096 1097
      code = tBlockDataMerge(&bData1, &bData2, pBlockData);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
    }
H
Hongze Cheng 已提交
1098

H
Hongze Cheng 已提交
1099 1100
    tBlockDataDestroy(&bData1, 1);
    tBlockDataDestroy(&bData2, 1);
H
Hongze Cheng 已提交
1101 1102
  }

H
Hongze Cheng 已提交
1103
  return code;
H
Hongze Cheng 已提交
1104

H
Hongze Cheng 已提交
1105 1106 1107 1108
_err:
  tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1109

H
Hongze Cheng 已提交
1110
int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) {
H
Hongze Cheng 已提交
1111
  int32_t code = 0;
H
Hongze Cheng 已提交
1112

H
Hongze Cheng 已提交
1113 1114 1115 1116
  // alloc
  code = tRealloc(&pReader->aBuf[0], pSstBlk->bInfo.szBlock);
  if (code) goto _err;

H
Hongze Cheng 已提交
1117
  // read
H
Hongze Cheng 已提交
1118
  code = tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock);
H
Hongze Cheng 已提交
1119
  if (code) goto _err;
H
Hongze Cheng 已提交
1120

H
Hongze Cheng 已提交
1121 1122
  // decmpr
  code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
H
Hongze Cheng 已提交
1123
  if (code) goto _err;
H
Hongze Cheng 已提交
1124

H
Hongze Cheng 已提交
1125 1126 1127 1128
  return code;

_err:
  tsdbError("vgId:%d tsdb read sst block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1129 1130 1131 1132 1133 1134 1135
  return code;
}

// SDelFWriter ====================================================
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
  int32_t      code = 0;
  char         fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
1136
  uint8_t      hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
  SDelFWriter *pDelFWriter;
  int64_t      n;

  // alloc
  pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter));
  if (pDelFWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pDelFWriter->pTsdb = pTsdb;
  pDelFWriter->fDel = *pFile;

  tsdbDelFileName(pTsdb, pFile, fname);
H
Hongze Cheng 已提交
1150 1151 1152
  code =
      tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH);
  if (code) goto _err;
H
Hongze Cheng 已提交
1153 1154

  // update header
H
Hongze Cheng 已提交
1155 1156
  code = tsdbWriteFile(pDelFWriter->pWriteH, 0, hdr, TSDB_FHDR_SIZE);
  if (code) goto _err;
H
Hongze Cheng 已提交
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175

  pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
  pDelFWriter->fDel.offset = 0;

  *ppWriter = pDelFWriter;
  return code;

_err:
  tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppWriter = NULL;
  return code;
}

int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
  int32_t      code = 0;
  SDelFWriter *pWriter = *ppWriter;
  STsdb       *pTsdb = pWriter->pTsdb;

  // sync
H
Hongze Cheng 已提交
1176 1177 1178
  if (sync) {
    code = tsdbFsyncFile(pWriter->pWriteH);
    if (code) goto _err;
H
Hongze Cheng 已提交
1179 1180 1181
  }

  // close
H
Hongze Cheng 已提交
1182
  tsdbCloseFile(&pWriter->pWriteH);
H
Hongze Cheng 已提交
1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202

  for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree(pWriter->aBuf[iBuf]);
  }
  taosMemoryFree(pWriter);

  *ppWriter = NULL;
  return code;

_err:
  tsdbError("vgId:%d, failed to close del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) {
  int32_t code = 0;
  int64_t size;
  int64_t n;

  // prepare
H
Hongze Cheng 已提交
1203
  size = 0;
H
Hongze Cheng 已提交
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
    size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData));
  }

  // alloc
  code = tRealloc(&pWriter->aBuf[0], size);
  if (code) goto _err;

  // build
  n = 0;
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
    n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData));
  }
H
Hongze Cheng 已提交
1217
  ASSERT(n == size);
H
Hongze Cheng 已提交
1218 1219

  // write
H
Hongze Cheng 已提交
1220 1221
  code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241

  // update
  pDelIdx->offset = pWriter->fDel.size;
  pDelIdx->size = size;
  pWriter->fDel.size += size;

  return code;

_err:
  tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
  int32_t  code = 0;
  int64_t  size;
  int64_t  n;
  SDelIdx *pDelIdx;

  // prepare
H
Hongze Cheng 已提交
1242
  size = 0;
H
Hongze Cheng 已提交
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
  for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
    size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx));
  }

  // alloc
  code = tRealloc(&pWriter->aBuf[0], size);
  if (code) goto _err;

  // build
  n = 0;
  for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
    n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx));
  }
H
Hongze Cheng 已提交
1256
  ASSERT(n == size);
H
Hongze Cheng 已提交
1257 1258

  // write
H
Hongze Cheng 已提交
1259 1260
  code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1261

H
Hongze Cheng 已提交
1262 1263 1264
  // update
  pWriter->fDel.offset = pWriter->fDel.size;
  pWriter->fDel.size += size;
H
Hongze Cheng 已提交
1265

H
Hongze Cheng 已提交
1266
  return code;
H
Hongze Cheng 已提交
1267

H
Hongze Cheng 已提交
1268 1269 1270 1271 1272 1273 1274
_err:
  tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1275
  char    hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
1276 1277 1278 1279 1280
  int64_t size = TSDB_FHDR_SIZE;
  int64_t n;

  // build
  tPutDelFile(hdr, &pWriter->fDel);
H
Hongze Cheng 已提交
1281

H
Hongze Cheng 已提交
1282
  // write
H
Hongze Cheng 已提交
1283 1284
  code = tsdbWriteFile(pWriter->pWriteH, 0, hdr, size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1285 1286 1287 1288

  return code;

_err:
H
Hongze Cheng 已提交
1289
  tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1290
  return code;
H
Hongze Cheng 已提交
1291
}
H
Hongze Cheng 已提交
1292 1293
// SDelFReader ====================================================
struct SDelFReader {
H
Hongze Cheng 已提交
1294 1295 1296
  STsdb   *pTsdb;
  SDelFile fDel;
  STsdbFD *pReadH;
H
Hongze Cheng 已提交
1297 1298
  uint8_t *aBuf[1];
};
H
Hongze Cheng 已提交
1299

H
Hongze Cheng 已提交
1300 1301 1302 1303 1304
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
  int32_t      code = 0;
  char         fname[TSDB_FILENAME_LEN];
  SDelFReader *pDelFReader;
  int64_t      n;
H
Hongze Cheng 已提交
1305

H
Hongze Cheng 已提交
1306 1307 1308
  // alloc
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
  if (pDelFReader == NULL) {
H
Hongze Cheng 已提交
1309
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1310 1311 1312 1313 1314 1315 1316 1317
    goto _err;
  }

  // open impl
  pDelFReader->pTsdb = pTsdb;
  pDelFReader->fDel = *pFile;

  tsdbDelFileName(pTsdb, pFile, fname);
H
Hongze Cheng 已提交
1318 1319
  code = tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ, &pDelFReader->pReadH);
  if (code) goto _err;
H
Hongze Cheng 已提交
1320

H
Hongze Cheng 已提交
1321
  *ppReader = pDelFReader;
H
Hongze Cheng 已提交
1322 1323
  return code;

H
Hongze Cheng 已提交
1324 1325 1326 1327
_err:
  tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
1328 1329
}

H
Hongze Cheng 已提交
1330 1331 1332
int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
  int32_t      code = 0;
  SDelFReader *pReader = *ppReader;
H
Hongze Cheng 已提交
1333

H
Hongze Cheng 已提交
1334
  if (pReader) {
H
Hongze Cheng 已提交
1335
    tsdbCloseFile(&pReader->pReadH);
H
Hongze Cheng 已提交
1336 1337 1338 1339
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
      tFree(pReader->aBuf[iBuf]);
    }
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
1340
  }
H
Hongze Cheng 已提交
1341
  *ppReader = NULL;
H
Hongze Cheng 已提交
1342 1343 1344 1345 1346

_exit:
  return code;
}

H
Hongze Cheng 已提交
1347
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
H
Hongze Cheng 已提交
1348
  int32_t code = 0;
H
Hongze Cheng 已提交
1349 1350 1351
  int64_t offset = pDelIdx->offset;
  int64_t size = pDelIdx->size;
  int64_t n;
H
Hongze Cheng 已提交
1352

H
Hongze Cheng 已提交
1353
  taosArrayClear(aDelData);
H
Hongze Cheng 已提交
1354

H
Hongze Cheng 已提交
1355 1356 1357
  // alloc
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1358

H
Hongze Cheng 已提交
1359
  // read
H
Hongze Cheng 已提交
1360 1361
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1362 1363 1364

  // // decode
  n = 0;
H
Hongze Cheng 已提交
1365
  while (n < size) {
H
Hongze Cheng 已提交
1366 1367 1368 1369 1370 1371
    SDelData delData;
    n += tGetDelData(pReader->aBuf[0] + n, &delData);

    if (taosArrayPush(aDelData, &delData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
H
Hongze Cheng 已提交
1372 1373
    }
  }
H
Hongze Cheng 已提交
1374
  ASSERT(n == size);
H
Hongze Cheng 已提交
1375 1376 1377 1378 1379

  return code;

_err:
  tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1380 1381 1382
  return code;
}

H
Hongze Cheng 已提交
1383
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
H
Hongze Cheng 已提交
1384
  int32_t code = 0;
H
Hongze Cheng 已提交
1385 1386 1387
  int32_t n;
  int64_t offset = pReader->fDel.offset;
  int64_t size = pReader->fDel.size - offset;
H
Hongze Cheng 已提交
1388

H
Hongze Cheng 已提交
1389 1390 1391 1392 1393 1394 1395
  taosArrayClear(aDelIdx);

  // alloc
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;

  // read
H
Hongze Cheng 已提交
1396 1397
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1398

H
Hongze Cheng 已提交
1399 1400
  // decode
  n = 0;
H
Hongze Cheng 已提交
1401
  while (n < size) {
H
Hongze Cheng 已提交
1402
    SDelIdx delIdx;
H
Hongze Cheng 已提交
1403

H
Hongze Cheng 已提交
1404
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
H
Hongze Cheng 已提交
1405

H
Hongze Cheng 已提交
1406 1407 1408 1409
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
1410 1411
  }

H
Hongze Cheng 已提交
1412
  ASSERT(n == size);
H
Hongze Cheng 已提交
1413

H
Hongze Cheng 已提交
1414
  return code;
H
Hongze Cheng 已提交
1415

H
Hongze Cheng 已提交
1416 1417
_err:
  tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1418
  return code;
H
Hongze Cheng 已提交
1419
}