tsdbReaderWriter.c 37.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// =============== PAGE-WISE FILE ===============
H
Hongze Cheng 已提交
19 20 21 22 23 24
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
#define LOGIC_TO_FILE_OFFSET(OFFSET, PAGE) \
  ((OFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (OFFSET) % PAGE_CONTENT_SIZE(PAGE))
#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE)*PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE))
#define PAGE_OFFSET(PGNO, PAGE)            (((PGNO)-1) * (PAGE))
#define OFFSET_PGNO(OFFSET, PAGE)          ((OFFSET) / (PAGE) + 1)
H
Hongze Cheng 已提交
25 26

static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
H
Hongze Cheng 已提交
27 28
  int32_t  code = 0;
  STsdbFD *pFD;
H
refact  
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30 31
  *ppFD = NULL;

H
Hongze Cheng 已提交
32
  pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
H
Hongze Cheng 已提交
33 34 35 36
  if (pFD == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
37

H
Hongze Cheng 已提交
38 39 40 41 42
  pFD->path = (char *)&pFD[1];
  strcpy(pFD->path, path);
  pFD->szPage = szPage;
  pFD->flag = flag;
  pFD->pFD = taosOpenFile(path, flag);
H
Hongze Cheng 已提交
43
  if (pFD->pFD == NULL) {
H
Hongze Cheng 已提交
44
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
45
    goto _exit;
H
Hongze Cheng 已提交
46
  }
H
Hongze Cheng 已提交
47
  pFD->szPage = szPage;
H
Hongze Cheng 已提交
48
  pFD->pgno = 0;
H
Hongze Cheng 已提交
49
  pFD->pBuf = taosMemoryMalloc(szPage);
H
Hongze Cheng 已提交
50
  if (pFD->pBuf == NULL) {
H
Hongze Cheng 已提交
51
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
52
    taosMemoryFree(pFD);
H
Hongze Cheng 已提交
53
    goto _exit;
H
Hongze Cheng 已提交
54
  }
H
Hongze Cheng 已提交
55 56 57 58 59 60
  if (taosStatFile(path, &pFD->szFile, NULL) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _exit;
  }
  ASSERT(pFD->szFile % szPage == 0);
  pFD->szFile = pFD->szFile / szPage;
H
Hongze Cheng 已提交
61
  *ppFD = pFD;
H
Hongze Cheng 已提交
62 63 64

_exit:
  return code;
H
Hongze Cheng 已提交
65
}
H
Hongze Cheng 已提交
66

H
Hongze Cheng 已提交
67 68
static void tsdbCloseFile(STsdbFD **ppFD) {
  STsdbFD *pFD = *ppFD;
H
Hongze Cheng 已提交
69 70
  taosMemoryFree(pFD->pBuf);
  taosCloseFile(&pFD->pFD);
H
Hongze Cheng 已提交
71 72
  taosMemoryFree(pFD);
  *ppFD = NULL;
H
Hongze Cheng 已提交
73 74
}

H
Hongze Cheng 已提交
75
static int32_t tsdbWriteFilePage(STsdbFD *pFD) {
H
Hongze Cheng 已提交
76
  int32_t code = 0;
H
Hongze Cheng 已提交
77

H
Hongze Cheng 已提交
78 79 80 81 82 83
  if (pFD->pgno > 0) {
    int64_t n = taosLSeekFile(pFD->pFD, PAGE_OFFSET(pFD->pgno, pFD->szPage), SEEK_SET);
    if (n < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _exit;
    }
H
Hongze Cheng 已提交
84

H
Hongze Cheng 已提交
85
    taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
H
Hongze Cheng 已提交
86

H
Hongze Cheng 已提交
87 88 89 90 91
    n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
    if (n < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _exit;
    }
H
Hongze Cheng 已提交
92

H
Hongze Cheng 已提交
93 94
    if (pFD->szFile < pFD->pgno) {
      pFD->szFile = pFD->szFile;
H
Hongze Cheng 已提交
95
    }
H
Hongze Cheng 已提交
96
  }
H
Hongze Cheng 已提交
97
  pFD->pgno = 0;
H
Hongze Cheng 已提交
98 99

_exit:
H
Hongze Cheng 已提交
100 101 102
  return code;
}

H
Hongze Cheng 已提交
103
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno) {
H
Hongze Cheng 已提交
104
  int32_t code = 0;
H
more  
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106 107
  ASSERT(pgno <= pFD->szFile);

H
Hongze Cheng 已提交
108 109 110
  // seek
  int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
  int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
H
Hongze Cheng 已提交
111
  if (n < 0) {
H
Hongze Cheng 已提交
112
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
113
    goto _exit;
H
Hongze Cheng 已提交
114 115
  }

H
Hongze Cheng 已提交
116
  // read
H
Hongze Cheng 已提交
117
  n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
H
Hongze Cheng 已提交
118
  if (n < 0) {
H
Hongze Cheng 已提交
119
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
120 121
    goto _exit;
  } else if (n < pFD->szPage) {
H
Hongze Cheng 已提交
122
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
123
    goto _exit;
H
Hongze Cheng 已提交
124 125
  }

H
Hongze Cheng 已提交
126
  // check
H
Hongze Cheng 已提交
127
  if (!taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
H
Hongze Cheng 已提交
128
    code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
129
    goto _exit;
H
Hongze Cheng 已提交
130 131
  }

H
Hongze Cheng 已提交
132
  pFD->pgno = pgno;
H
Hongze Cheng 已提交
133

H
Hongze Cheng 已提交
134 135 136
_exit:
  return code;
}
H
Hongze Cheng 已提交
137

H
Hongze Cheng 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
  int32_t code = 0;
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
  int64_t bOffset = fOffset % pFD->szPage;
  int64_t n = 0;

  do {
    if (pFD->pgno != pgno) {
      code = tsdbWriteFilePage(pFD);
      if (code) goto _exit;

      if (pgno < pFD->szFile) {
        code = tsdbReadFilePage(pFD, pgno);
        if (code) goto _exit;
      } else {
        pFD->pgno = pgno;
      }
    }

H
Hongze Cheng 已提交
158 159
    int64_t nWrite = TMIN(PAGE_CONTENT_SIZE(pFD->szPage) - bOffset, size - n);
    memcpy(pFD->pBuf + bOffset, pBuf + n, nWrite);
H
Hongze Cheng 已提交
160 161 162

    pgno++;
    bOffset = 0;
H
Hongze Cheng 已提交
163
    n += nWrite;
H
Hongze Cheng 已提交
164 165 166 167 168 169
  } while (n < size);

_exit:
  return code;
}

H
Hongze Cheng 已提交
170
static int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
H
Hongze Cheng 已提交
171
  int32_t code = 0;
H
Hongze Cheng 已提交
172
  int64_t n = 0;
H
Hongze Cheng 已提交
173 174
  int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
  int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
H
Hongze Cheng 已提交
175
  int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177
  ASSERT(pgno && pgno <= pFD->szFile);
H
Hongze Cheng 已提交
178
  if (pFD->pgno == pgno) {
H
Hongze Cheng 已提交
179 180
    int64_t bOff = fOffset % pFD->szPage;
    int64_t nRead = TMIN(szPgCont - bOff, size);
H
Hongze Cheng 已提交
181 182 183 184

    ASSERT(bOff < szPgCont);

    memcpy(pBuf, pFD->pBuf + bOff, nRead);
H
Hongze Cheng 已提交
185
    n = nRead;
H
Hongze Cheng 已提交
186
    pgno++;
H
Hongze Cheng 已提交
187 188
  }

H
Hongze Cheng 已提交
189
  while (n < size) {
H
Hongze Cheng 已提交
190 191
    code = tsdbReadFilePage(pFD, pgno);
    if (code) goto _exit;
H
Hongze Cheng 已提交
192

H
Hongze Cheng 已提交
193
    int64_t nRead = TMIN(szPgCont, size - n);
H
Hongze Cheng 已提交
194
    memcpy(pBuf + n, pFD->pBuf, nRead);
H
Hongze Cheng 已提交
195

H
Hongze Cheng 已提交
196
    n += nRead;
H
Hongze Cheng 已提交
197
    pgno++;
H
Hongze Cheng 已提交
198 199 200
  }

_exit:
H
Hongze Cheng 已提交
201
  return code;
H
Hongze Cheng 已提交
202 203
}

H
Hongze Cheng 已提交
204
static int32_t tsdbFsyncFile(STsdbFD *pFD) {
H
Hongze Cheng 已提交
205
  int32_t code = 0;
H
Hongze Cheng 已提交
206 207 208 209 210 211 212 213 214 215

  code = tsdbWriteFilePage(pFD);
  if (code) goto _exit;

  if (taosFsyncFile(pFD->pFD) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _exit;
  }

_exit:
H
Hongze Cheng 已提交
216 217 218
  return code;
}

H
Hongze Cheng 已提交
219 220
// SDataFWriter ====================================================
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
H
Hongze Cheng 已提交
221
  int32_t       code = 0;
H
Hongze Cheng 已提交
222 223
  int32_t       flag;
  int64_t       n;
H
Hongze Cheng 已提交
224
  int32_t       szPage = TSDB_DEFAULT_PAGE_SIZE;
H
Hongze Cheng 已提交
225
  SDataFWriter *pWriter = NULL;
H
Hongze Cheng 已提交
226
  char          fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
227
  char          hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
228 229

  // alloc
H
Hongze Cheng 已提交
230 231
  pWriter = taosMemoryCalloc(1, sizeof(*pWriter));
  if (pWriter == NULL) {
H
Hongze Cheng 已提交
232 233 234
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
235
  pWriter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
236 237 238 239 240 241
  pWriter->wSet = (SDFileSet){.diskId = pSet->diskId,
                              .fid = pSet->fid,
                              .pHeadF = &pWriter->fHead,
                              .pDataF = &pWriter->fData,
                              .pSmaF = &pWriter->fSma,
                              .nSstF = pSet->nSstF};
H
Hongze Cheng 已提交
242 243 244 245 246 247 248
  pWriter->fHead = *pSet->pHeadF;
  pWriter->fData = *pSet->pDataF;
  pWriter->fSma = *pSet->pSmaF;
  for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) {
    pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst];
    pWriter->fSst[iSst] = *pSet->aSstF[iSst];
  }
H
Hongze Cheng 已提交
249 250

  // head
H
Hongze Cheng 已提交
251
  flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
252
  tsdbHeadFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fHead, fname);
H
Hongze Cheng 已提交
253 254
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pHeadFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
255

H
Hongze Cheng 已提交
256
  code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
257
  if (code) goto _err;
H
Hongze Cheng 已提交
258 259
  pWriter->fHead.size += TSDB_FHDR_SIZE;

H
Hongze Cheng 已提交
260
  // data
H
Hongze Cheng 已提交
261
  if (pWriter->fData.size == 0) {
H
Hongze Cheng 已提交
262
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
263
  } else {
H
Hongze Cheng 已提交
264
    flag = TD_FILE_READ | TD_FILE_WRITE;
H
Hongze Cheng 已提交
265 266
  }
  tsdbDataFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fData, fname);
H
Hongze Cheng 已提交
267 268
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pDataFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
269
  if (pWriter->fData.size == 0) {
H
Hongze Cheng 已提交
270
    code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
271
    if (code) goto _err;
H
Hongze Cheng 已提交
272 273
    pWriter->fData.size += TSDB_FHDR_SIZE;
  }
H
Hongze Cheng 已提交
274 275

  // sma
H
Hongze Cheng 已提交
276
  if (pWriter->fSma.size == 0) {
H
Hongze Cheng 已提交
277
    flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
278
  } else {
H
Hongze Cheng 已提交
279
    flag = TD_FILE_READ | TD_FILE_WRITE;
H
Hongze Cheng 已提交
280 281
  }
  tsdbSmaFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSma, fname);
H
Hongze Cheng 已提交
282 283
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSmaFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
284
  if (pWriter->fSma.size == 0) {
H
Hongze Cheng 已提交
285
    code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
286
    if (code) goto _err;
H
Hongze Cheng 已提交
287

H
Hongze Cheng 已提交
288
    pWriter->fSma.size += TSDB_FHDR_SIZE;
H
Hongze Cheng 已提交
289 290
  }

H
Hongze Cheng 已提交
291 292
  // sst
  ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0);
H
Hongze Cheng 已提交
293
  flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
H
Hongze Cheng 已提交
294
  tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname);
H
Hongze Cheng 已提交
295
  code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSstFD);
H
Hongze Cheng 已提交
296
  if (code) goto _err;
H
Hongze Cheng 已提交
297
  code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
298
  if (code) goto _err;
H
Hongze Cheng 已提交
299 300 301
  pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE;

  *ppWriter = pWriter;
H
Hongze Cheng 已提交
302 303 304
  return code;

_err:
H
Hongze Cheng 已提交
305 306
  tsdbError("vgId:%d, tsdb data file writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppWriter = NULL;
H
Hongze Cheng 已提交
307 308 309
  return code;
}

H
Hongze Cheng 已提交
310
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
H
Hongze Cheng 已提交
311
  int32_t code = 0;
H
Hongze Cheng 已提交
312
  STsdb  *pTsdb = NULL;
H
Hongze Cheng 已提交
313

H
Hongze Cheng 已提交
314 315 316 317
  if (*ppWriter == NULL) goto _exit;

  pTsdb = (*ppWriter)->pTsdb;
  if (sync) {
H
Hongze Cheng 已提交
318 319
    code = tsdbFsyncFile((*ppWriter)->pHeadFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
320

H
Hongze Cheng 已提交
321 322
    code = tsdbFsyncFile((*ppWriter)->pDataFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
323

H
Hongze Cheng 已提交
324 325
    code = tsdbFsyncFile((*ppWriter)->pSmaFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
326

H
Hongze Cheng 已提交
327 328
    code = tsdbFsyncFile((*ppWriter)->pSstFD);
    if (code) goto _err;
H
Hongze Cheng 已提交
329 330
  }

H
Hongze Cheng 已提交
331 332 333
  tsdbCloseFile(&(*ppWriter)->pHeadFD);
  tsdbCloseFile(&(*ppWriter)->pDataFD);
  tsdbCloseFile(&(*ppWriter)->pSmaFD);
H
Hongze Cheng 已提交
334
  tsdbCloseFile(&(*ppWriter)->pSstFD);
H
Hongze Cheng 已提交
335

H
Hongze Cheng 已提交
336 337
  for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree((*ppWriter)->aBuf[iBuf]);
H
Hongze Cheng 已提交
338
  }
H
Hongze Cheng 已提交
339
  taosMemoryFree(*ppWriter);
H
Hongze Cheng 已提交
340
_exit:
H
Hongze Cheng 已提交
341
  *ppWriter = NULL;
H
Hongze Cheng 已提交
342 343 344
  return code;

_err:
H
Hongze Cheng 已提交
345
  tsdbError("vgId:%d, data file writer close failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
346 347 348
  return code;
}

H
Hongze Cheng 已提交
349 350 351 352
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
  int32_t code = 0;
  int64_t n;
  char    hdr[TSDB_FHDR_SIZE];
H
Hongze Cheng 已提交
353

H
Hongze Cheng 已提交
354 355 356
  // head ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutHeadFile(hdr, &pWriter->fHead);
H
Hongze Cheng 已提交
357
  code = tsdbWriteFile(pWriter->pHeadFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
358
  if (code) goto _err;
H
Hongze Cheng 已提交
359 360 361 362

  // data ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutDataFile(hdr, &pWriter->fData);
H
Hongze Cheng 已提交
363
  code = tsdbWriteFile(pWriter->pDataFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
364
  if (code) goto _err;
H
Hongze Cheng 已提交
365

H
Hongze Cheng 已提交
366 367 368
  // sma ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutSmaFile(hdr, &pWriter->fSma);
H
Hongze Cheng 已提交
369
  code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
370
  if (code) goto _err;
H
Hongze Cheng 已提交
371

H
Hongze Cheng 已提交
372 373 374
  // sst ==============
  memset(hdr, 0, TSDB_FHDR_SIZE);
  tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]);
H
Hongze Cheng 已提交
375
  code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE);
H
Hongze Cheng 已提交
376
  if (code) goto _err;
H
Hongze Cheng 已提交
377 378 379 380

  return code;

_err:
H
Hongze Cheng 已提交
381
  tsdbError("vgId:%d, update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
382 383 384
  return code;
}

H
Hongze Cheng 已提交
385 386 387
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx) {
  int32_t    code = 0;
  SHeadFile *pHeadFile = &pWriter->fHead;
H
Hongze Cheng 已提交
388
  int64_t    size;
H
Hongze Cheng 已提交
389
  int64_t    n;
H
Hongze Cheng 已提交
390

H
Hongze Cheng 已提交
391 392 393
  // check
  if (taosArrayGetSize(aBlockIdx) == 0) {
    pHeadFile->offset = pHeadFile->size;
H
Hongze Cheng 已提交
394 395
    goto _exit;
  }
H
Hongze Cheng 已提交
396

H
Hongze Cheng 已提交
397
  // prepare
H
Hongze Cheng 已提交
398
  size = 0;
H
Hongze Cheng 已提交
399 400 401 402
  for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
    size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx));
  }

H
Hongze Cheng 已提交
403
  // alloc
H
Hongze Cheng 已提交
404
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
405 406
  if (code) goto _err;

H
Hongze Cheng 已提交
407 408 409 410
  // build
  n = 0;
  for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) {
    n += tPutBlockIdx(pWriter->aBuf[0] + n, taosArrayGet(aBlockIdx, iBlockIdx));
H
Hongze Cheng 已提交
411
  }
H
Hongze Cheng 已提交
412
  ASSERT(n == size);
H
Hongze Cheng 已提交
413 414

  // write
H
Hongze Cheng 已提交
415
  code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
416
  if (code) goto _err;
H
Hongze Cheng 已提交
417

H
Hongze Cheng 已提交
418 419 420
  // update
  pHeadFile->offset = pHeadFile->size;
  pHeadFile->size += size;
H
Hongze Cheng 已提交
421

H
Hongze Cheng 已提交
422
_exit:
H
Hongze Cheng 已提交
423 424
  tsdbTrace("vgId:%d write block idx, offset:%" PRId64 " size:%" PRId64 " nBlockIdx:%d", TD_VID(pWriter->pTsdb->pVnode),
            pHeadFile->offset, size, taosArrayGetSize(aBlockIdx));
H
Hongze Cheng 已提交
425 426 427
  return code;

_err:
H
Hongze Cheng 已提交
428
  tsdbError("vgId:%d, write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
429 430 431
  return code;
}

H
Hongze Cheng 已提交
432 433 434 435 436 437 438
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) {
  int32_t    code = 0;
  SHeadFile *pHeadFile = &pWriter->fHead;
  int64_t    size;
  int64_t    n;

  ASSERT(mBlock->nItem > 0);
H
Hongze Cheng 已提交
439

H
Hongze Cheng 已提交
440
  // alloc
H
Hongze Cheng 已提交
441
  size = tPutMapData(NULL, mBlock);
H
Hongze Cheng 已提交
442
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
443 444
  if (code) goto _err;

H
Hongze Cheng 已提交
445
  // build
H
Hongze Cheng 已提交
446
  n = tPutMapData(pWriter->aBuf[0], mBlock);
H
Hongze Cheng 已提交
447 448

  // write
H
Hongze Cheng 已提交
449
  code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
450
  if (code) goto _err;
H
Hongze Cheng 已提交
451

H
Hongze Cheng 已提交
452 453 454 455
  // update
  pBlockIdx->offset = pHeadFile->size;
  pBlockIdx->size = size;
  pHeadFile->size += size;
H
Hongze Cheng 已提交
456

H
Hongze Cheng 已提交
457 458 459 460
  tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64
            " size:%" PRId64 " nItem:%d",
            TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid,
            pBlockIdx->offset, pBlockIdx->size, mBlock->nItem);
H
Hongze Cheng 已提交
461 462 463
  return code;

_err:
H
Hongze Cheng 已提交
464
  tsdbError("vgId:%d, write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
465 466 467
  return code;
}

H
Hongze Cheng 已提交
468
int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
H
Hongze Cheng 已提交
469
  int32_t   code = 0;
H
Hongze Cheng 已提交
470
  SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1];
H
Hongze Cheng 已提交
471
  int64_t   size;
H
Hongze Cheng 已提交
472
  int64_t   n;
H
Hongze Cheng 已提交
473

H
Hongze Cheng 已提交
474 475 476 477 478
  // check
  if (taosArrayGetSize(aSstBlk) == 0) {
    pSstFile->offset = pSstFile->size;
    goto _exit;
  }
H
Hongze Cheng 已提交
479

H
Hongze Cheng 已提交
480
  // size
H
Hongze Cheng 已提交
481
  size = 0;
H
Hongze Cheng 已提交
482 483 484
  for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
    size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL));
  }
H
Hongze Cheng 已提交
485 486

  // alloc
H
Hongze Cheng 已提交
487
  code = tRealloc(&pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
488 489
  if (code) goto _err;

H
Hongze Cheng 已提交
490 491 492 493
  // encode
  n = 0;
  for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
    n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL));
H
Hongze Cheng 已提交
494
  }
H
Hongze Cheng 已提交
495 496

  // write
H
Hongze Cheng 已提交
497
  code = tsdbWriteFile(pWriter->pSstFD, pSstFile->size, pWriter->aBuf[0], size);
H
Hongze Cheng 已提交
498
  if (code) goto _err;
H
Hongze Cheng 已提交
499

H
Hongze Cheng 已提交
500 501 502
  // update
  pSstFile->offset = pSstFile->size;
  pSstFile->size += size;
H
Hongze Cheng 已提交
503

H
Hongze Cheng 已提交
504
_exit:
H
Hongze Cheng 已提交
505
  tsdbTrace("vgId:%d tsdb write sst block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
H
Hongze Cheng 已提交
506
            pSstFile->offset, size);
H
Hongze Cheng 已提交
507 508 509
  return code;

_err:
H
Hongze Cheng 已提交
510
  tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
511 512 513
  return code;
}

H
Hongze Cheng 已提交
514
static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) {
H
Hongze Cheng 已提交
515 516
  int32_t code = 0;

H
Hongze Cheng 已提交
517 518
  pSmaInfo->offset = 0;
  pSmaInfo->size = 0;
H
Hongze Cheng 已提交
519

H
Hongze Cheng 已提交
520 521 522
  // encode
  for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
H
Hongze Cheng 已提交
523

H
Hongze Cheng 已提交
524
    if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue;
H
Hongze Cheng 已提交
525

H
Hongze Cheng 已提交
526 527
    SColumnDataAgg sma;
    tsdbCalcColDataSMA(pColData, &sma);
H
Hongze Cheng 已提交
528

H
Hongze Cheng 已提交
529 530 531 532
    code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &sma));
    if (code) goto _err;
    pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &sma);
  }
H
Hongze Cheng 已提交
533

H
Hongze Cheng 已提交
534 535
  // write
  if (pSmaInfo->size) {
H
Hongze Cheng 已提交
536
    code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
537
    if (code) goto _err;
H
Hongze Cheng 已提交
538

H
Hongze Cheng 已提交
539
    code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
540
    if (code) goto _err;
H
Hongze Cheng 已提交
541

H
Hongze Cheng 已提交
542
    pSmaInfo->offset = pWriter->fSma.size;
H
Hongze Cheng 已提交
543
    pWriter->fSma.size += pSmaInfo->size;
H
Hongze Cheng 已提交
544 545 546 547 548
  }

  return code;

_err:
H
Hongze Cheng 已提交
549
  tsdbError("vgId:%d tsdb write block sma failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
550 551 552
  return code;
}

H
Hongze Cheng 已提交
553 554
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
                           int8_t cmprAlg, int8_t toLast) {
H
Hongze Cheng 已提交
555 556
  int32_t code = 0;

H
Hongze Cheng 已提交
557
  ASSERT(pBlockData->nRow > 0);
H
Hongze Cheng 已提交
558

H
Hongze Cheng 已提交
559 560 561 562 563
  if (toLast) {
    pBlkInfo->offset = pWriter->fSst[pWriter->wSet.nSstF - 1].size;
  } else {
    pBlkInfo->offset = pWriter->fData.size;
  }
H
Hongze Cheng 已提交
564 565
  pBlkInfo->szBlock = 0;
  pBlkInfo->szKey = 0;
H
Hongze Cheng 已提交
566

H
Hongze Cheng 已提交
567 568 569
  int32_t aBufN[4] = {0};
  code = tCmprBlockData(pBlockData, cmprAlg, NULL, NULL, pWriter->aBuf, aBufN);
  if (code) goto _err;
H
Hongze Cheng 已提交
570

H
Hongze Cheng 已提交
571
  // write =================
H
Hongze Cheng 已提交
572
  STsdbFD *pFD = toLast ? pWriter->pSstFD : pWriter->pDataFD;
H
Hongze Cheng 已提交
573

H
Hongze Cheng 已提交
574 575
  pBlkInfo->szKey = aBufN[3] + aBufN[2];
  pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
H
Hongze Cheng 已提交
576

H
Hongze Cheng 已提交
577 578
  int64_t offset = pBlkInfo->offset;
  code = tsdbWriteFile(pFD, offset, pWriter->aBuf[3], aBufN[3]);
H
Hongze Cheng 已提交
579
  if (code) goto _err;
H
Hongze Cheng 已提交
580
  offset += aBufN[3];
H
Hongze Cheng 已提交
581

H
Hongze Cheng 已提交
582
  code = tsdbWriteFile(pFD, offset, pWriter->aBuf[2], aBufN[2]);
H
Hongze Cheng 已提交
583
  if (code) goto _err;
H
Hongze Cheng 已提交
584
  offset += aBufN[2];
H
Hongze Cheng 已提交
585

H
Hongze Cheng 已提交
586
  if (aBufN[1]) {
H
Hongze Cheng 已提交
587
    code = tsdbWriteFile(pFD, offset, pWriter->aBuf[1], aBufN[1]);
H
Hongze Cheng 已提交
588
    if (code) goto _err;
H
Hongze Cheng 已提交
589
    offset += aBufN[1];
H
Hongze Cheng 已提交
590
  }
H
Hongze Cheng 已提交
591

H
Hongze Cheng 已提交
592
  if (aBufN[0]) {
H
Hongze Cheng 已提交
593
    code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], aBufN[0]);
H
Hongze Cheng 已提交
594
    if (code) goto _err;
H
Hongze Cheng 已提交
595
  }
H
Hongze Cheng 已提交
596

H
Hongze Cheng 已提交
597 598 599 600 601 602
  // update info
  if (toLast) {
    pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock;
  } else {
    pWriter->fData.size += pBlkInfo->szBlock;
  }
H
Hongze Cheng 已提交
603

H
Hongze Cheng 已提交
604 605 606 607 608
  // ================= SMA ====================
  if (pSmaInfo) {
    code = tsdbWriteBlockSma(pWriter, pBlockData, pSmaInfo);
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
609

H
Hongze Cheng 已提交
610 611 612 613
_exit:
  tsdbTrace("vgId:%d tsdb write block data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d, offset:%" PRId64 " size:%d",
            TD_VID(pWriter->pTsdb->pVnode), pBlockData->suid, pBlockData->uid, pBlockData->nRow, pBlkInfo->offset,
            pBlkInfo->szBlock);
H
Hongze Cheng 已提交
614 615 616
  return code;

_err:
H
Hongze Cheng 已提交
617
  tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
618 619
  return code;
}
H
Hongze Cheng 已提交
620

H
Hongze Cheng 已提交
621 622 623 624 625 626 627 628
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
  int32_t   code = 0;
  int64_t   n;
  int64_t   size;
  TdFilePtr pOutFD = NULL;  // TODO
  TdFilePtr PInFD = NULL;   // TODO
  char      fNameFrom[TSDB_FILENAME_LEN];
  char      fNameTo[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
629

H
Hongze Cheng 已提交
630 631 632
  // head
  tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
  tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo);
H
Hongze Cheng 已提交
633

H
Hongze Cheng 已提交
634 635 636
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
637 638 639
    goto _err;
  }

H
Hongze Cheng 已提交
640 641
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
642 643 644 645
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
646
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pHeadF->size);
H
Hongze Cheng 已提交
647 648 649 650
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
651 652
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
653 654

  // data
H
Hongze Cheng 已提交
655 656
  tsdbDataFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pDataF, fNameFrom);
  tsdbDataFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pDataF, fNameTo);
H
Hongze Cheng 已提交
657

H
Hongze Cheng 已提交
658 659
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
H
Hongze Cheng 已提交
660 661 662 663
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
664 665
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
666 667 668
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
669 670

  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pDataF->size);
H
Hongze Cheng 已提交
671 672 673 674
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
675 676
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
677

H
Hongze Cheng 已提交
678 679 680
  // sst
  tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom);
  tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo);
H
Hongze Cheng 已提交
681

H
Hongze Cheng 已提交
682 683 684 685
  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
686 687
  }

H
Hongze Cheng 已提交
688 689
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
690 691 692 693
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
694 695
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size);
  if (n < 0) {
H
Hongze Cheng 已提交
696 697 698
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }
H
Hongze Cheng 已提交
699 700
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);
H
Hongze Cheng 已提交
701

H
Hongze Cheng 已提交
702 703 704 705 706 707
  // sma
  tsdbSmaFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pSmaF, fNameFrom);
  tsdbSmaFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pSmaF, fNameTo);

  pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (pOutFD == NULL) {
H
Hongze Cheng 已提交
708 709 710 711
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
712 713
  PInFD = taosOpenFile(fNameFrom, TD_FILE_READ);
  if (PInFD == NULL) {
H
Hongze Cheng 已提交
714 715 716 717
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
718 719 720 721
  n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pSmaF->size);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
H
Hongze Cheng 已提交
722
  }
H
Hongze Cheng 已提交
723 724 725
  taosCloseFile(&pOutFD);
  taosCloseFile(&PInFD);

H
Hongze Cheng 已提交
726 727 728
  return code;

_err:
H
Hongze Cheng 已提交
729
  tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
730 731 732
  return code;
}

H
Hongze Cheng 已提交
733 734 735 736
// SDataFReader ====================================================
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
  int32_t       code = 0;
  SDataFReader *pReader;
H
Hongze Cheng 已提交
737
  int32_t       szPage = TSDB_DEFAULT_PAGE_SIZE;
H
Hongze Cheng 已提交
738
  char          fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
739

H
Hongze Cheng 已提交
740 741 742 743
  // alloc
  pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
  if (pReader == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
744 745
    goto _err;
  }
H
Hongze Cheng 已提交
746 747
  pReader->pTsdb = pTsdb;
  pReader->pSet = pSet;
H
Hongze Cheng 已提交
748

H
Hongze Cheng 已提交
749 750
  // head
  tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
H
Hongze Cheng 已提交
751 752
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pHeadFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
753

H
Hongze Cheng 已提交
754 755
  // data
  tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
H
Hongze Cheng 已提交
756 757
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pDataFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
758

H
Hongze Cheng 已提交
759 760
  // sma
  tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
H
Hongze Cheng 已提交
761 762
  code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
  if (code) goto _err;
H
Hongze Cheng 已提交
763

H
Hongze Cheng 已提交
764 765 766
  // sst
  for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
    tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
H
Hongze Cheng 已提交
767
    code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSstFD[iSst]);
H
Hongze Cheng 已提交
768
    if (code) goto _err;
H
Hongze Cheng 已提交
769
  }
H
Hongze Cheng 已提交
770

H
Hongze Cheng 已提交
771 772 773 774 775 776 777 778 779 780 781
  *ppReader = pReader;
  return code;

_err:
  tsdbError("vgId:%d, tsdb data file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
}

int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
  int32_t code = 0;
H
Hongze Cheng 已提交
782
  if (*ppReader == NULL) return code;
H
Hongze Cheng 已提交
783 784

  // head
H
Hongze Cheng 已提交
785
  tsdbCloseFile(&(*ppReader)->pHeadFD);
H
Hongze Cheng 已提交
786

H
Hongze Cheng 已提交
787
  // data
H
Hongze Cheng 已提交
788
  tsdbCloseFile(&(*ppReader)->pDataFD);
H
Hongze Cheng 已提交
789

H
Hongze Cheng 已提交
790
  // sma
H
Hongze Cheng 已提交
791
  tsdbCloseFile(&(*ppReader)->pSmaFD);
H
Hongze Cheng 已提交
792

H
Hongze Cheng 已提交
793 794
  // sst
  for (int32_t iSst = 0; iSst < (*ppReader)->pSet->nSstF; iSst++) {
H
Hongze Cheng 已提交
795
    tsdbCloseFile(&(*ppReader)->aSstFD[iSst]);
H
Hongze Cheng 已提交
796 797 798 799
  }

  for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree((*ppReader)->aBuf[iBuf]);
H
Hongze Cheng 已提交
800
  }
H
Hongze Cheng 已提交
801 802
  taosMemoryFree(*ppReader);
  *ppReader = NULL;
H
Hongze Cheng 已提交
803 804 805
  return code;

_err:
H
Hongze Cheng 已提交
806
  tsdbError("vgId:%d, data file reader close failed since %s", TD_VID((*ppReader)->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
807 808 809
  return code;
}

H
Hongze Cheng 已提交
810
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
H
Hongze Cheng 已提交
811 812 813 814
  int32_t    code = 0;
  SHeadFile *pHeadFile = pReader->pSet->pHeadF;
  int64_t    offset = pHeadFile->offset;
  int64_t    size = pHeadFile->size - offset;
H
Hongze Cheng 已提交
815

H
Hongze Cheng 已提交
816
  taosArrayClear(aBlockIdx);
H
Hongze Cheng 已提交
817
  if (size == 0) return code;
H
Hongze Cheng 已提交
818 819

  // alloc
H
Hongze Cheng 已提交
820
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
821 822
  if (code) goto _err;

H
Hongze Cheng 已提交
823
  // read
H
Hongze Cheng 已提交
824 825
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
826 827

  // decode
H
Hongze Cheng 已提交
828
  int64_t n = 0;
H
Hongze Cheng 已提交
829
  while (n < size) {
H
Hongze Cheng 已提交
830 831 832 833 834 835 836
    SBlockIdx blockIdx;
    n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);

    if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
837
  }
H
Hongze Cheng 已提交
838
  ASSERT(n == size);
H
Hongze Cheng 已提交
839 840 841 842

  return code;

_err:
H
Hongze Cheng 已提交
843
  tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
844 845 846
  return code;
}

H
Hongze Cheng 已提交
847
int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
H
Hongze Cheng 已提交
848 849 850 851
  int32_t   code = 0;
  SSstFile *pSstFile = pReader->pSet->aSstF[iSst];
  int64_t   offset = pSstFile->offset;
  int64_t   size = pSstFile->size - offset;
H
Hongze Cheng 已提交
852

H
Hongze Cheng 已提交
853
  taosArrayClear(aSstBlk);
H
Hongze Cheng 已提交
854
  if (size == 0) return code;
H
Hongze Cheng 已提交
855 856

  // alloc
H
Hongze Cheng 已提交
857
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
858 859
  if (code) goto _err;

H
Hongze Cheng 已提交
860
  // read
H
Hongze Cheng 已提交
861 862
  code = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
863

H
Hongze Cheng 已提交
864
  // decode
H
Hongze Cheng 已提交
865
  int64_t n = 0;
H
Hongze Cheng 已提交
866 867 868
  while (n < size) {
    SSstBlk sstBlk;
    n += tGetSstBlk(pReader->aBuf[0] + n, &sstBlk);
H
Hongze Cheng 已提交
869

H
Hongze Cheng 已提交
870
    if (taosArrayPush(aSstBlk, &sstBlk) == NULL) {
H
Hongze Cheng 已提交
871 872 873 874
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
  }
H
Hongze Cheng 已提交
875
  ASSERT(n == size);
H
Hongze Cheng 已提交
876

H
Hongze Cheng 已提交
877 878 879
  return code;

_err:
H
Hongze Cheng 已提交
880
  tsdbError("vgId:%d read sst blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
881 882 883
  return code;
}

H
Hongze Cheng 已提交
884 885 886 887
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) {
  int32_t code = 0;
  int64_t offset = pBlockIdx->offset;
  int64_t size = pBlockIdx->size;
H
Hongze Cheng 已提交
888 889

  // alloc
H
Hongze Cheng 已提交
890
  code = tRealloc(&pReader->aBuf[0], size);
H
Hongze Cheng 已提交
891 892
  if (code) goto _err;

H
Hongze Cheng 已提交
893
  // read
H
Hongze Cheng 已提交
894 895
  code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
896

H
Hongze Cheng 已提交
897
  // decode
H
Hongze Cheng 已提交
898 899
  int64_t n = tGetMapData(pReader->aBuf[0], mBlock);
  if (n < 0) {
H
Hongze Cheng 已提交
900 901 902
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
903
  ASSERT(n == size);
H
Hongze Cheng 已提交
904

H
Hongze Cheng 已提交
905
  return code;
H
Hongze Cheng 已提交
906

H
Hongze Cheng 已提交
907 908 909
_err:
  tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
910
}
H
Hongze Cheng 已提交
911

H
Hongze Cheng 已提交
912 913 914
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) {
  int32_t   code = 0;
  SSmaInfo *pSmaInfo = &pDataBlk->smaInfo;
H
Hongze Cheng 已提交
915

H
Hongze Cheng 已提交
916
  ASSERT(pSmaInfo->size > 0);
H
Hongze Cheng 已提交
917

H
Hongze Cheng 已提交
918
  taosArrayClear(aColumnDataAgg);
H
Hongze Cheng 已提交
919

H
Hongze Cheng 已提交
920
  // alloc
H
Hongze Cheng 已提交
921
  code = tRealloc(&pReader->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
922
  if (code) goto _err;
H
Hongze Cheng 已提交
923

H
Hongze Cheng 已提交
924
  // read
H
Hongze Cheng 已提交
925
  code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size);
H
Hongze Cheng 已提交
926
  if (code) goto _err;
H
Hongze Cheng 已提交
927

H
Hongze Cheng 已提交
928
  // decode
H
Hongze Cheng 已提交
929
  int32_t n = 0;
H
Hongze Cheng 已提交
930 931 932
  while (n < pSmaInfo->size) {
    SColumnDataAgg sma;
    n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma);
H
Hongze Cheng 已提交
933

H
Hongze Cheng 已提交
934 935
    if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
936 937
      goto _err;
    }
H
Hongze Cheng 已提交
938
  }
H
Hongze Cheng 已提交
939
  ASSERT(n == pSmaInfo->size);
H
Hongze Cheng 已提交
940
  return code;
H
Hongze Cheng 已提交
941

H
Hongze Cheng 已提交
942
_err:
H
Hongze Cheng 已提交
943
  tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
944 945
  return code;
}
H
Hongze Cheng 已提交
946

H
Hongze Cheng 已提交
947
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData) {
H
Hongze Cheng 已提交
948
  int32_t code = 0;
H
Hongze Cheng 已提交
949

H
Hongze Cheng 已提交
950 951
  tBlockDataClear(pBlockData);

H
Hongze Cheng 已提交
952
  STsdbFD *pFD = pReader->pDataFD;
H
Hongze Cheng 已提交
953 954

  // uid + version + tskey
H
Hongze Cheng 已提交
955 956 957 958 959 960
  code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
  if (code) goto _err;

  code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey);
  if (code) goto _err;

H
Hongze Cheng 已提交
961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
  SDiskDataHdr hdr;
  uint8_t     *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr);

  ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
  ASSERT(pBlockData->suid == hdr.suid);
  ASSERT(pBlockData->uid == hdr.uid);

  pBlockData->nRow = hdr.nRow;

  // uid
  if (hdr.uid == 0) {
    ASSERT(hdr.szUid);
    code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid,
                          sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
    if (code) goto _err;
  } else {
    ASSERT(!hdr.szUid);
  }
  p += hdr.szUid;
H
Hongze Cheng 已提交
980

H
Hongze Cheng 已提交
981 982 983 984 985
  // version
  code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion,
                        sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
  if (code) goto _err;
  p += hdr.szVer;
H
Hongze Cheng 已提交
986

H
Hongze Cheng 已提交
987 988 989
  // TSKEY
  code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
                        sizeof(TSKEY) * hdr.nRow, &pReader->aBuf[1]);
H
Hongze Cheng 已提交
990
  if (code) goto _err;
H
Hongze Cheng 已提交
991
  p += hdr.szKey;
H
Hongze Cheng 已提交
992

H
Hongze Cheng 已提交
993
  ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey);
H
Hongze Cheng 已提交
994

H
Hongze Cheng 已提交
995 996
  // read and decode columns
  if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit;
H
Hongze Cheng 已提交
997

H
Hongze Cheng 已提交
998 999
  if (hdr.szBlkCol > 0) {
    int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
H
Hongze Cheng 已提交
1000 1001
    code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol);
    if (code) goto _err;
H
Hongze Cheng 已提交
1002 1003
  }

H
Hongze Cheng 已提交
1004 1005 1006
  SBlockCol  blockCol = {.cid = 0};
  SBlockCol *pBlockCol = &blockCol;
  int32_t    n = 0;
H
Hongze Cheng 已提交
1007

H
Hongze Cheng 已提交
1008 1009
  for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
    SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
H
Hongze Cheng 已提交
1010

H
Hongze Cheng 已提交
1011 1012 1013 1014 1015 1016 1017
    while (pBlockCol && pBlockCol->cid < pColData->cid) {
      if (n < hdr.szBlkCol) {
        n += tGetBlockCol(pReader->aBuf[0] + n, pBlockCol);
      } else {
        ASSERT(n == hdr.szBlkCol);
        pBlockCol = NULL;
      }
H
Hongze Cheng 已提交
1018
    }
H
Hongze Cheng 已提交
1019

H
Hongze Cheng 已提交
1020 1021 1022 1023 1024 1025 1026 1027 1028
    if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) {
      // add a lot of NONE
      for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
        code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
        if (code) goto _err;
      }
    } else {
      ASSERT(pBlockCol->type == pColData->type);
      ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
H
Hongze Cheng 已提交
1029

H
Hongze Cheng 已提交
1030 1031 1032 1033 1034 1035 1036 1037
      if (pBlockCol->flag == HAS_NULL) {
        // add a lot of NULL
        for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
          code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
          if (code) goto _err;
        }
      } else {
        // decode from binary
H
Hongze Cheng 已提交
1038 1039
        int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + pBlockCol->offset;
        int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue;
H
Hongze Cheng 已提交
1040

H
Hongze Cheng 已提交
1041 1042
        code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size);
        if (code) goto _err;
H
Hongze Cheng 已提交
1043 1044 1045 1046 1047

        code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
        if (code) goto _err;
      }
    }
H
Hongze Cheng 已提交
1048
  }
H
Hongze Cheng 已提交
1049

H
Hongze Cheng 已提交
1050
_exit:
H
Hongze Cheng 已提交
1051 1052
  return code;

H
Hongze Cheng 已提交
1053
_err:
H
Hongze Cheng 已提交
1054
  tsdbError("vgId:%d tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1055
  return code;
H
Hongze Cheng 已提交
1056
}
H
Hongze Cheng 已提交
1057

H
Hongze Cheng 已提交
1058 1059
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1060

H
Hongze Cheng 已提交
1061
  code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData);
H
Hongze Cheng 已提交
1062
  if (code) goto _err;
H
Hongze Cheng 已提交
1063

H
Hongze Cheng 已提交
1064 1065 1066
  if (pDataBlk->nSubBlock > 1) {
    SBlockData bData1;
    SBlockData bData2;
H
Hongze Cheng 已提交
1067

H
Hongze Cheng 已提交
1068 1069 1070 1071 1072
    // create
    code = tBlockDataCreate(&bData1);
    if (code) goto _err;
    code = tBlockDataCreate(&bData2);
    if (code) goto _err;
H
Hongze Cheng 已提交
1073

H
Hongze Cheng 已提交
1074 1075 1076
    // init
    tBlockDataInitEx(&bData1, pBlockData);
    tBlockDataInitEx(&bData2, pBlockData);
H
Hongze Cheng 已提交
1077

H
Hongze Cheng 已提交
1078
    for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
H
Hongze Cheng 已提交
1079
      code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], &bData1);
H
Hongze Cheng 已提交
1080 1081 1082 1083 1084
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
H
Hongze Cheng 已提交
1085

H
Hongze Cheng 已提交
1086 1087 1088 1089 1090 1091
      code = tBlockDataCopy(pBlockData, &bData2);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
H
Hongze Cheng 已提交
1092

H
Hongze Cheng 已提交
1093 1094 1095 1096 1097 1098 1099
      code = tBlockDataMerge(&bData1, &bData2, pBlockData);
      if (code) {
        tBlockDataDestroy(&bData1, 1);
        tBlockDataDestroy(&bData2, 1);
        goto _err;
      }
    }
H
Hongze Cheng 已提交
1100

H
Hongze Cheng 已提交
1101 1102
    tBlockDataDestroy(&bData1, 1);
    tBlockDataDestroy(&bData2, 1);
H
Hongze Cheng 已提交
1103 1104
  }

H
Hongze Cheng 已提交
1105
  return code;
H
Hongze Cheng 已提交
1106

H
Hongze Cheng 已提交
1107 1108 1109 1110
_err:
  tsdbError("vgId:%d tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1111

H
Hongze Cheng 已提交
1112
int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) {
H
Hongze Cheng 已提交
1113
  int32_t code = 0;
H
Hongze Cheng 已提交
1114

H
Hongze Cheng 已提交
1115 1116 1117 1118
  // alloc
  code = tRealloc(&pReader->aBuf[0], pSstBlk->bInfo.szBlock);
  if (code) goto _err;

H
Hongze Cheng 已提交
1119
  // read
H
Hongze Cheng 已提交
1120
  code = tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock);
H
Hongze Cheng 已提交
1121
  if (code) goto _err;
H
Hongze Cheng 已提交
1122

H
Hongze Cheng 已提交
1123 1124
  // decmpr
  code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
H
Hongze Cheng 已提交
1125
  if (code) goto _err;
H
Hongze Cheng 已提交
1126

H
Hongze Cheng 已提交
1127 1128 1129 1130
  return code;

_err:
  tsdbError("vgId:%d tsdb read sst block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1131 1132 1133 1134 1135 1136 1137
  return code;
}

// SDelFWriter ====================================================
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) {
  int32_t      code = 0;
  char         fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
1138
  uint8_t      hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
  SDelFWriter *pDelFWriter;
  int64_t      n;

  // alloc
  pDelFWriter = (SDelFWriter *)taosMemoryCalloc(1, sizeof(*pDelFWriter));
  if (pDelFWriter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
  pDelFWriter->pTsdb = pTsdb;
  pDelFWriter->fDel = *pFile;

  tsdbDelFileName(pTsdb, pFile, fname);
H
Hongze Cheng 已提交
1152 1153 1154
  code =
      tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH);
  if (code) goto _err;
H
Hongze Cheng 已提交
1155 1156

  // update header
H
Hongze Cheng 已提交
1157 1158
  code = tsdbWriteFile(pDelFWriter->pWriteH, 0, hdr, TSDB_FHDR_SIZE);
  if (code) goto _err;
H
Hongze Cheng 已提交
1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177

  pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
  pDelFWriter->fDel.offset = 0;

  *ppWriter = pDelFWriter;
  return code;

_err:
  tsdbError("vgId:%d, failed to open del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppWriter = NULL;
  return code;
}

int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync) {
  int32_t      code = 0;
  SDelFWriter *pWriter = *ppWriter;
  STsdb       *pTsdb = pWriter->pTsdb;

  // sync
H
Hongze Cheng 已提交
1178 1179 1180
  if (sync) {
    code = tsdbFsyncFile(pWriter->pWriteH);
    if (code) goto _err;
H
Hongze Cheng 已提交
1181 1182 1183
  }

  // close
H
Hongze Cheng 已提交
1184
  tsdbCloseFile(&pWriter->pWriteH);
H
Hongze Cheng 已提交
1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204

  for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t *); iBuf++) {
    tFree(pWriter->aBuf[iBuf]);
  }
  taosMemoryFree(pWriter);

  *ppWriter = NULL;
  return code;

_err:
  tsdbError("vgId:%d, failed to close del file writer since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, SDelIdx *pDelIdx) {
  int32_t code = 0;
  int64_t size;
  int64_t n;

  // prepare
H
Hongze Cheng 已提交
1205
  size = 0;
H
Hongze Cheng 已提交
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
    size += tPutDelData(NULL, taosArrayGet(aDelData, iDelData));
  }

  // alloc
  code = tRealloc(&pWriter->aBuf[0], size);
  if (code) goto _err;

  // build
  n = 0;
  for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
    n += tPutDelData(pWriter->aBuf[0] + n, taosArrayGet(aDelData, iDelData));
  }
H
Hongze Cheng 已提交
1219
  ASSERT(n == size);
H
Hongze Cheng 已提交
1220 1221

  // write
H
Hongze Cheng 已提交
1222 1223
  code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243

  // update
  pDelIdx->offset = pWriter->fDel.size;
  pDelIdx->size = size;
  pWriter->fDel.size += size;

  return code;

_err:
  tsdbError("vgId:%d, failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx) {
  int32_t  code = 0;
  int64_t  size;
  int64_t  n;
  SDelIdx *pDelIdx;

  // prepare
H
Hongze Cheng 已提交
1244
  size = 0;
H
Hongze Cheng 已提交
1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
  for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
    size += tPutDelIdx(NULL, taosArrayGet(aDelIdx, iDelIdx));
  }

  // alloc
  code = tRealloc(&pWriter->aBuf[0], size);
  if (code) goto _err;

  // build
  n = 0;
  for (int32_t iDelIdx = 0; iDelIdx < taosArrayGetSize(aDelIdx); iDelIdx++) {
    n += tPutDelIdx(pWriter->aBuf[0] + n, taosArrayGet(aDelIdx, iDelIdx));
  }
H
Hongze Cheng 已提交
1258
  ASSERT(n == size);
H
Hongze Cheng 已提交
1259 1260

  // write
H
Hongze Cheng 已提交
1261 1262
  code = tsdbWriteFile(pWriter->pWriteH, pWriter->fDel.size, pWriter->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1263

H
Hongze Cheng 已提交
1264 1265 1266
  // update
  pWriter->fDel.offset = pWriter->fDel.size;
  pWriter->fDel.size += size;
H
Hongze Cheng 已提交
1267

H
Hongze Cheng 已提交
1268
  return code;
H
Hongze Cheng 已提交
1269

H
Hongze Cheng 已提交
1270 1271 1272 1273 1274 1275 1276
_err:
  tsdbError("vgId:%d, write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1277
  char    hdr[TSDB_FHDR_SIZE] = {0};
H
Hongze Cheng 已提交
1278 1279 1280 1281 1282
  int64_t size = TSDB_FHDR_SIZE;
  int64_t n;

  // build
  tPutDelFile(hdr, &pWriter->fDel);
H
Hongze Cheng 已提交
1283

H
Hongze Cheng 已提交
1284
  // write
H
Hongze Cheng 已提交
1285 1286
  code = tsdbWriteFile(pWriter->pWriteH, 0, hdr, size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1287 1288 1289 1290

  return code;

_err:
H
Hongze Cheng 已提交
1291
  tsdbError("vgId:%d, update del file hdr failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1292
  return code;
H
Hongze Cheng 已提交
1293
}
H
Hongze Cheng 已提交
1294 1295
// SDelFReader ====================================================
struct SDelFReader {
H
Hongze Cheng 已提交
1296 1297 1298
  STsdb   *pTsdb;
  SDelFile fDel;
  STsdbFD *pReadH;
H
Hongze Cheng 已提交
1299 1300
  uint8_t *aBuf[1];
};
H
Hongze Cheng 已提交
1301

H
Hongze Cheng 已提交
1302 1303 1304 1305 1306
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb) {
  int32_t      code = 0;
  char         fname[TSDB_FILENAME_LEN];
  SDelFReader *pDelFReader;
  int64_t      n;
H
Hongze Cheng 已提交
1307

H
Hongze Cheng 已提交
1308 1309 1310
  // alloc
  pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
  if (pDelFReader == NULL) {
H
Hongze Cheng 已提交
1311
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1312 1313 1314 1315 1316 1317 1318 1319
    goto _err;
  }

  // open impl
  pDelFReader->pTsdb = pTsdb;
  pDelFReader->fDel = *pFile;

  tsdbDelFileName(pTsdb, pFile, fname);
H
Hongze Cheng 已提交
1320 1321
  code = tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ, &pDelFReader->pReadH);
  if (code) goto _err;
H
Hongze Cheng 已提交
1322

H
Hongze Cheng 已提交
1323
  *ppReader = pDelFReader;
H
Hongze Cheng 已提交
1324 1325
  return code;

H
Hongze Cheng 已提交
1326 1327 1328 1329
_err:
  tsdbError("vgId:%d, del file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  *ppReader = NULL;
  return code;
H
Hongze Cheng 已提交
1330 1331
}

H
Hongze Cheng 已提交
1332 1333 1334
int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
  int32_t      code = 0;
  SDelFReader *pReader = *ppReader;
H
Hongze Cheng 已提交
1335

H
Hongze Cheng 已提交
1336
  if (pReader) {
H
Hongze Cheng 已提交
1337
    tsdbCloseFile(&pReader->pReadH);
H
Hongze Cheng 已提交
1338 1339 1340 1341
    for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(uint8_t *); iBuf++) {
      tFree(pReader->aBuf[iBuf]);
    }
    taosMemoryFree(pReader);
H
Hongze Cheng 已提交
1342
  }
H
Hongze Cheng 已提交
1343
  *ppReader = NULL;
H
Hongze Cheng 已提交
1344 1345 1346 1347 1348

_exit:
  return code;
}

H
Hongze Cheng 已提交
1349
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
H
Hongze Cheng 已提交
1350
  int32_t code = 0;
H
Hongze Cheng 已提交
1351 1352 1353
  int64_t offset = pDelIdx->offset;
  int64_t size = pDelIdx->size;
  int64_t n;
H
Hongze Cheng 已提交
1354

H
Hongze Cheng 已提交
1355
  taosArrayClear(aDelData);
H
Hongze Cheng 已提交
1356

H
Hongze Cheng 已提交
1357 1358 1359
  // alloc
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1360

H
Hongze Cheng 已提交
1361
  // read
H
Hongze Cheng 已提交
1362 1363
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1364 1365 1366

  // // decode
  n = 0;
H
Hongze Cheng 已提交
1367
  while (n < size) {
H
Hongze Cheng 已提交
1368 1369 1370 1371 1372 1373
    SDelData delData;
    n += tGetDelData(pReader->aBuf[0] + n, &delData);

    if (taosArrayPush(aDelData, &delData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
H
Hongze Cheng 已提交
1374 1375
    }
  }
H
Hongze Cheng 已提交
1376
  ASSERT(n == size);
H
Hongze Cheng 已提交
1377 1378 1379 1380 1381

  return code;

_err:
  tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1382 1383 1384
  return code;
}

H
Hongze Cheng 已提交
1385
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
H
Hongze Cheng 已提交
1386
  int32_t code = 0;
H
Hongze Cheng 已提交
1387 1388 1389
  int32_t n;
  int64_t offset = pReader->fDel.offset;
  int64_t size = pReader->fDel.size - offset;
H
Hongze Cheng 已提交
1390

H
Hongze Cheng 已提交
1391 1392 1393 1394 1395 1396 1397
  taosArrayClear(aDelIdx);

  // alloc
  code = tRealloc(&pReader->aBuf[0], size);
  if (code) goto _err;

  // read
H
Hongze Cheng 已提交
1398 1399
  code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size);
  if (code) goto _err;
H
Hongze Cheng 已提交
1400

H
Hongze Cheng 已提交
1401 1402
  // decode
  n = 0;
H
Hongze Cheng 已提交
1403
  while (n < size) {
H
Hongze Cheng 已提交
1404
    SDelIdx delIdx;
H
Hongze Cheng 已提交
1405

H
Hongze Cheng 已提交
1406
    n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
H
Hongze Cheng 已提交
1407

H
Hongze Cheng 已提交
1408 1409 1410 1411
    if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
1412 1413
  }

H
Hongze Cheng 已提交
1414
  ASSERT(n == size);
H
Hongze Cheng 已提交
1415

H
Hongze Cheng 已提交
1416
  return code;
H
Hongze Cheng 已提交
1417

H
Hongze Cheng 已提交
1418 1419
_err:
  tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1420
  return code;
H
Hongze Cheng 已提交
1421
}